Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Cloud Devops Env Environment Implementation. | |
| A deterministic mock cloud/devops environment with reward shaping and | |
| anti-farming guardrails for hackathon evaluation. | |
| """ | |
| from __future__ import annotations | |
| import copy | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import CloudAction, CloudObservation, CloudState | |
| except ImportError: | |
| from models import CloudAction, CloudObservation, CloudState | |
| class CloudDevopsEnvironment(Environment): | |
| """ | |
| A deterministic mock cloud/devops environment. | |
| Tasks: | |
| - easy: open port 80 on sg-web | |
| - medium: inspect noisy API logs, then open port 5432 on sg-db | |
| - hard: trace 502 from lb-main to i-web2, then restart i-web2 (not i-web1) | |
| Example: | |
| >>> env = CloudDevopsEnvironment() | |
| >>> obs = env.reset() | |
| >>> print(obs.system_health_status) # "CRITICAL" | |
| >>> | |
| >>> obs = env.step(CloudAction(command="list_resources")) | |
| >>> print(obs.output) | |
| """ | |
| # Enable concurrent WebSocket sessions. | |
| # Set to True if your environment isolates state between instances. | |
| # When True, multiple WebSocket clients can connect simultaneously, each | |
| # getting their own environment instance (when using factory mode in app.py). | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| MAX_STEPS: int = 20 | |
| VALID_TASKS = {"easy", "medium", "hard"} | |
| ACTION_COST: float = 0.01 | |
| def __init__(self, task_name: str = "easy"): | |
| """Initialize the cloud_devops_env environment.""" | |
| normalized_task = (task_name or "easy").lower() | |
| if normalized_task not in self.VALID_TASKS: | |
| raise ValueError(f"Unknown task: {task_name}") | |
| self.task_name = normalized_task | |
| self._state_data: CloudState | None = None | |
| self._achievements: set[str] = set() | |
| def _build_noise_resources(self) -> dict[str, dict[str, object]]: | |
| """Generate deterministic decoy resources to force retrieval and filtering.""" | |
| resources: dict[str, dict[str, object]] = {} | |
| for i in range(1, 21): | |
| suffix = f"{i:02d}" | |
| resources[f"i-backend-{suffix}"] = { | |
| "type": "Instance", | |
| "status": "running", | |
| "logs": ( | |
| "[2026-04-06 17:00:00] INFO node-exporter: " | |
| "standard metrics reported successfully" | |
| ), | |
| } | |
| resources[f"sg-backend-{suffix}"] = { | |
| "type": "SecurityGroup", | |
| "rules": [{"port": 443, "action": "allow"}], | |
| } | |
| return resources | |
| def _build_task_resources(self) -> dict[str, dict[str, object]]: | |
| resources = self._build_noise_resources() | |
| if self.task_name == "easy": | |
| resources.update( | |
| { | |
| "i-web": {"type": "Instance", "status": "running"}, | |
| "sg-web": { | |
| "type": "SecurityGroup", | |
| "rules": [{"port": 22, "action": "allow"}], | |
| }, | |
| } | |
| ) | |
| return resources | |
| if self.task_name == "medium": | |
| resources.update( | |
| { | |
| "i-api": { | |
| "type": "Instance", | |
| "ip_address": "10.0.4.11", | |
| "status": "running", | |
| "logs": ( | |
| "[2026-04-06 17:01:22] [CRITICAL] " | |
| "sqlalchemy.exc.OperationalError: " | |
| "(psycopg2.OperationalError) connection to server at " | |
| "'10.0.4.5', port 5432 failed: Connection timed out. " | |
| "Is the server running and accepting TCP/IP connections?" | |
| ), | |
| }, | |
| "i-db": { | |
| "type": "Instance", | |
| "ip_address": "10.0.4.5", | |
| "status": "running", | |
| }, | |
| "sg-db": { | |
| "type": "SecurityGroup", | |
| "rules": [{"port": 22, "action": "allow"}], | |
| }, | |
| "metadata-svc": { | |
| "type": "MetadataService", | |
| "status": "running", | |
| }, | |
| } | |
| ) | |
| return resources | |
| resources.update( | |
| { | |
| "lb-main": { | |
| "type": "LoadBalancer", | |
| "logs": ( | |
| "2026/04/06 17:02:09 [error] 3197#3197: *4189 upstream timed out " | |
| "(110: Connection timed out) while reading response header from upstream, " | |
| "client: 10.0.2.14, server: api.prod.local, request: \"GET /checkout HTTP/1.1\", " | |
| "upstream: \"http://10.0.8.22:8080/checkout\", host: \"api.prod.local\"\n" | |
| "2026/04/06 17:02:10 [error] 3197#3197: *4190 no live upstreams while " | |
| "connecting to upstream \"10.0.8.22\"" | |
| ), | |
| }, | |
| "lb-external": { | |
| "type": "LoadBalancer", | |
| "status": "running", | |
| "logs": "INFO: Edge traffic stable.", | |
| }, | |
| "i-web1": { | |
| "type": "Instance", | |
| "ip_address": "10.0.8.21", | |
| "status": "running", | |
| "logs": ( | |
| "[2026-04-06 17:02:11] INFO web-service: readiness probe passed\n" | |
| "[2026-04-06 17:02:12] INFO jvm: heap usage stable at 42%" | |
| ), | |
| }, | |
| "i-web2": { | |
| "type": "Instance", | |
| "ip_address": "10.0.8.22", | |
| "status": "degraded", | |
| "logs": ( | |
| "kernel: Out of memory: Killed process 12345 (java) total-vm:4194304kB, " | |
| "anon-rss:3145728kB\n" | |
| "systemd[1]: web-service.service: Main process exited, code=killed, " | |
| "status=9/KILL" | |
| ), | |
| }, | |
| "sg-web": { | |
| "type": "SecurityGroup", | |
| "rules": [{"port": 80, "action": "allow"}], | |
| }, | |
| "metadata-svc": { | |
| "type": "MetadataService", | |
| "status": "running", | |
| }, | |
| } | |
| ) | |
| return resources | |
| def _lookup_resource_by_ip(self, ip_address: str) -> str | None: | |
| if self._state_data is None: | |
| return None | |
| for resource_id, data in self._state_data.resources.items(): | |
| if data.get("ip_address") == ip_address: | |
| return resource_id | |
| return None | |
| def _apply_cascading_failure(self) -> tuple[float, str]: | |
| """Simulate system drift in hard mode if root cause is not fixed quickly.""" | |
| if self._state_data is None or self.task_name != "hard": | |
| return 0.0, "" | |
| state = self._state_data | |
| if state.is_resolved or state.step_count <= 8: | |
| return 0.0, "" | |
| lb = state.resources.get("lb-external") | |
| if not lb: | |
| return 0.0, "" | |
| if lb.get("status") != "DOWN": | |
| lb["status"] = "DOWN" | |
| lb["logs"] = ( | |
| "CRITICAL: Cascading failure triggered after prolonged unresolved OOM incident. " | |
| "Edge load balancer stopped serving traffic." | |
| ) | |
| return -0.05, ( | |
| "\nALERT: Cascading failure detected. lb-external is DOWN due to delayed remediation." | |
| ) | |
| return -0.03, "" | |
| def _reward_once(self, achievement: str, points: float) -> float: | |
| if achievement in self._achievements: | |
| return 0.0 | |
| self._achievements.add(achievement) | |
| return points | |
| def _task_objective(self) -> str: | |
| objectives = { | |
| "easy": "Restore web access by allowing port 80 on sg-web.", | |
| "medium": ( | |
| "Restore API to DB connectivity by reading i-api logs, resolving DB IP via " | |
| "query_metadata, then allowing port 5432 on sg-db." | |
| ), | |
| "hard": ( | |
| "Recover checkout path by tracing lb-main upstream IP, resolving it with " | |
| "query_metadata, inspecting i-web2, and restarting i-web2 safely." | |
| ), | |
| } | |
| return objectives[self.task_name] | |
| def reset(self) -> CloudObservation: # type: ignore[override] | |
| """Reset the environment to the initial state for the selected task.""" | |
| self._achievements.clear() | |
| self._state_data = CloudState( | |
| episode_id=str(uuid4()), | |
| task_difficulty=self.task_name, | |
| resources=copy.deepcopy(self._build_task_resources()), | |
| step_count=0, | |
| is_resolved=False, | |
| ) | |
| return CloudObservation( | |
| output=( | |
| "Environment initialized. System status is currently CRITICAL. " | |
| "Use 'list_resources' to begin triage." | |
| ), | |
| error=None, | |
| system_health_status="CRITICAL", | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "step_count": 0, | |
| "resolved": False, | |
| "task": self.task_name, | |
| "total_resources": len(self._state_data.resources), | |
| "objective": self._task_objective(), | |
| "deterministic": True, | |
| "max_steps": self.MAX_STEPS, | |
| "action_cost": self.ACTION_COST, | |
| "hard_cascade_trigger_step": 8, | |
| }, | |
| echoed_message="Cloud Devops Env environment ready!", | |
| message_length=0, | |
| ) | |
| def step(self, action: CloudAction) -> CloudObservation: # type: ignore[override] | |
| """Execute the agent action and return the next observation.""" | |
| if self._state_data is None: | |
| self.reset() | |
| assert self._state_data is not None | |
| state = self._state_data | |
| state.step_count += 1 | |
| reward = -self.ACTION_COST | |
| reward_breakdown: list[dict[str, object]] = [ | |
| {"event": "action_cost", "delta": -self.ACTION_COST} | |
| ] | |
| done = False | |
| output = "" | |
| error = None | |
| termination_reason = "in_progress" | |
| def add_reward(delta: float, event: str) -> None: | |
| nonlocal reward | |
| if abs(delta) < 1e-12: | |
| return | |
| reward += delta | |
| reward_breakdown.append({"event": event, "delta": round(float(delta), 4)}) | |
| try: | |
| if action.command == "list_resources": | |
| res_list = [ | |
| f"{resource_id} ({data['type']})" | |
| for resource_id, data in sorted(state.resources.items()) | |
| ] | |
| output = "Available Resources:\n" + "\n".join(res_list) | |
| elif action.command == "describe_resource": | |
| if not action.resource_id or action.resource_id not in state.resources: | |
| raise ValueError(f"Resource {action.resource_id} not found.") | |
| output = str(state.resources[action.resource_id]) | |
| if self.task_name == "easy" and action.resource_id == "sg-web": | |
| add_reward(self._reward_once("read_sg", 0.2), "inspect_web_sg") | |
| elif self.task_name == "medium" and action.resource_id == "sg-db": | |
| add_reward(self._reward_once("read_sg", 0.2), "inspect_db_sg") | |
| elif self.task_name == "hard" and action.resource_id == "i-web2": | |
| add_reward( | |
| self._reward_once("inspect_target", 0.2), | |
| "inspect_target_instance", | |
| ) | |
| elif action.command == "view_logs": | |
| if not action.resource_id: | |
| raise ValueError("resource_id is required for view_logs.") | |
| res = state.resources.get(action.resource_id) | |
| if not res: | |
| raise ValueError(f"Resource {action.resource_id} not found.") | |
| output = str(res.get("logs", "No logs available for this resource.")) | |
| if self.task_name == "medium" and action.resource_id == "i-api": | |
| add_reward(self._reward_once("read_logs", 0.2), "inspect_api_logs") | |
| elif self.task_name == "hard" and action.resource_id == "lb-main": | |
| add_reward(self._reward_once("inspect_lb", 0.2), "inspect_lb_logs") | |
| elif self.task_name == "hard" and action.resource_id == "i-web2": | |
| add_reward( | |
| self._reward_once("inspect_target", 0.2), | |
| "inspect_target_logs", | |
| ) | |
| elif action.command == "query_metadata": | |
| ip_address = None | |
| if action.parameters and isinstance(action.parameters, dict): | |
| ip_address = action.parameters.get("ip_address") | |
| if not ip_address and action.resource_id: | |
| ip_address = action.resource_id | |
| if not ip_address: | |
| raise ValueError("query_metadata requires parameters.ip_address.") | |
| resource_id = self._lookup_resource_by_ip(str(ip_address)) | |
| if not resource_id: | |
| raise ValueError(f"No resource found for ip_address={ip_address}") | |
| output = f"Metadata lookup: ip_address={ip_address} resource_id={resource_id}" | |
| if self.task_name == "medium" and str(ip_address) == "10.0.4.5": | |
| add_reward( | |
| self._reward_once("lookup_db_target", 0.2), | |
| "resolve_db_ip_dependency", | |
| ) | |
| elif self.task_name == "hard" and str(ip_address) == "10.0.8.22": | |
| add_reward( | |
| self._reward_once("lookup_upstream_target", 0.2), | |
| "resolve_upstream_ip_dependency", | |
| ) | |
| elif action.command == "update_security_group": | |
| if not action.resource_id: | |
| raise ValueError("resource_id is required for update_security_group.") | |
| res = state.resources.get(action.resource_id) | |
| if not res or res.get("type") != "SecurityGroup": | |
| raise ValueError(f"Invalid Security Group ID: {action.resource_id}") | |
| if not action.parameters or "port" not in action.parameters: | |
| raise ValueError("Missing 'port' in parameters.") | |
| if "action" not in action.parameters: | |
| raise ValueError("Missing 'action' in parameters. Use 'allow' or 'deny'.") | |
| rule = copy.deepcopy(action.parameters) | |
| rules = res.get("rules") | |
| if not isinstance(rules, list): | |
| raise ValueError(f"Security group {action.resource_id} has invalid rules.") | |
| port = int(rule["port"]) | |
| rule_action = str(rule.get("action", "")).lower() | |
| if rule_action not in {"allow", "deny"}: | |
| raise ValueError( | |
| "Invalid security-group action. Supported values: 'allow', 'deny'." | |
| ) | |
| rules.append(rule) | |
| output = f"Successfully updated {action.resource_id} with rule: {rule}" | |
| if ( | |
| self.task_name == "easy" | |
| and action.resource_id == "sg-web" | |
| and port == 80 | |
| and rule_action == "allow" | |
| ): | |
| state.is_resolved = True | |
| add_reward(0.8, "resolve_easy_web_ingress") | |
| done = True | |
| termination_reason = "resolved_easy" | |
| output += "\nSUCCESS: Web server is now accessible!" | |
| elif ( | |
| self.task_name == "medium" | |
| and action.resource_id == "sg-db" | |
| and port == 5432 | |
| and rule_action == "allow" | |
| ): | |
| investigated = ( | |
| "read_logs" in self._achievements | |
| and "lookup_db_target" in self._achievements | |
| ) | |
| if investigated: | |
| state.is_resolved = True | |
| add_reward(0.6, "resolve_medium_db_connectivity") | |
| done = True | |
| termination_reason = "resolved_medium" | |
| output += "\nSUCCESS: Database connection restored!" | |
| else: | |
| add_reward(-0.1, "unsafe_change_without_triage") | |
| output += ( | |
| "\nWARNING: Change applied without incident triage. " | |
| "Inspect API logs and resolve DB IP via query_metadata before closing the incident." | |
| ) | |
| elif rule_action == "deny": | |
| add_reward(-0.1, "deny_rule_during_incident") | |
| output += "\nWARNING: Deny rule applied during outage remediation." | |
| elif action.command == "restart_service": | |
| if not action.resource_id: | |
| raise ValueError("resource_id is required for restart_service.") | |
| if action.resource_id not in state.resources: | |
| raise ValueError(f"Resource {action.resource_id} not found.") | |
| output = f"Service on {action.resource_id} restarted." | |
| if self.task_name == "hard": | |
| if action.resource_id == "i-web2": | |
| investigated_root_cause = ( | |
| "inspect_lb" in self._achievements | |
| and "inspect_target" in self._achievements | |
| and "lookup_upstream_target" in self._achievements | |
| ) | |
| if investigated_root_cause: | |
| state.resources["i-web2"]["status"] = "running" | |
| state.resources["i-web2"][ | |
| "logs" | |
| ] = "INFO: Restart successful. Memory cleared." | |
| state.is_resolved = True | |
| add_reward(0.8, "resolve_hard_upstream_recovery") | |
| done = True | |
| termination_reason = "resolved_hard" | |
| output += "\nSUCCESS: OutOfMemory loop broken. System stable." | |
| else: | |
| add_reward(-0.1, "restart_without_root_cause") | |
| output += ( | |
| "\nWARNING: Restart denied by change policy. " | |
| "Find failing upstream IP from lb-main, resolve it with query_metadata, and inspect i-web2 first." | |
| ) | |
| elif action.resource_id == "i-web1": | |
| add_reward(-0.2, "restart_healthy_node") | |
| output += ( | |
| "\nWARNING: You restarted a healthy production server! " | |
| "Users dropped." | |
| ) | |
| elif action.command == "submit_solution": | |
| if state.is_resolved: | |
| done = True | |
| termination_reason = "resolved_submit_solution" | |
| output = "Solution verified. System is HEALTHY." | |
| else: | |
| if self.task_name == "hard": | |
| # In hard mode, unresolved submission should not abort the run. | |
| done = False | |
| add_reward(-0.1, "premature_submit_hard") | |
| output = ( | |
| "Solution incorrect. Incident is still CRITICAL. " | |
| "Continue triage and remediation before submitting." | |
| ) | |
| else: | |
| done = True | |
| termination_reason = "incorrect_submit" | |
| output = "Solution incorrect. System is still CRITICAL." | |
| else: | |
| raise ValueError(f"Unsupported command: {action.command}") | |
| except Exception as exc: | |
| error = str(exc) | |
| output = f"Command Failed: {error}" | |
| cascade_penalty, cascade_msg = self._apply_cascading_failure() | |
| add_reward(cascade_penalty, "cascading_failure_penalty") | |
| if cascade_msg: | |
| output = f"{output}{cascade_msg}" if output else cascade_msg.strip() | |
| if state.step_count >= self.MAX_STEPS and not done: | |
| done = True | |
| termination_reason = "max_steps_timeout" | |
| timeout_suffix = "\nTIMEOUT: Max steps reached." | |
| output = f"{output}{timeout_suffix}" if output else timeout_suffix.strip() | |
| raw_reward = reward | |
| reward = max(-1.0, min(1.0, reward)) | |
| if reward != raw_reward: | |
| reward_breakdown.append( | |
| { | |
| "event": "reward_clip", | |
| "delta": round(float(reward - raw_reward), 4), | |
| } | |
| ) | |
| lb_external = state.resources.get("lb-external", {}) | |
| if state.is_resolved: | |
| status = "HEALTHY" | |
| elif self.task_name == "hard" and lb_external.get("status") == "DOWN": | |
| status = "DEGRADED" | |
| else: | |
| status = "CRITICAL" | |
| info = { | |
| "step_count": state.step_count, | |
| "resolved": state.is_resolved, | |
| "task": self.task_name, | |
| "achievements": sorted(self._achievements), | |
| "total_resources": len(state.resources), | |
| "action_cost": self.ACTION_COST, | |
| "objective": self._task_objective(), | |
| "deterministic": True, | |
| "max_steps": self.MAX_STEPS, | |
| "termination_reason": termination_reason if done else "in_progress", | |
| "reward_breakdown": reward_breakdown, | |
| } | |
| return CloudObservation( | |
| output=output, | |
| error=error, | |
| system_health_status=status, | |
| done=done, | |
| reward=reward, | |
| metadata=info, | |
| echoed_message=output, | |
| message_length=len(output), | |
| ) | |
| def state(self) -> State: | |
| """Return hidden environment state for evaluators/debugging.""" | |
| if self._state_data is None: | |
| self.reset() | |
| assert self._state_data is not None | |
| return self._state_data | |