Cloud-DevOps-RLEnv / server /cloud_devops_env_environment.py
SidhaGarg's picture
Strengthen grading transparency and reproducibility evidence
f97f9a6
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Cloud Devops Env Environment Implementation.
A deterministic mock cloud/devops environment with reward shaping and
anti-farming guardrails for hackathon evaluation.
"""
from __future__ import annotations
import copy
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import CloudAction, CloudObservation, CloudState
except ImportError:
from models import CloudAction, CloudObservation, CloudState
class CloudDevopsEnvironment(Environment):
"""
A deterministic mock cloud/devops environment.
Tasks:
- easy: open port 80 on sg-web
- medium: inspect noisy API logs, then open port 5432 on sg-db
- hard: trace 502 from lb-main to i-web2, then restart i-web2 (not i-web1)
Example:
>>> env = CloudDevopsEnvironment()
>>> obs = env.reset()
>>> print(obs.system_health_status) # "CRITICAL"
>>>
>>> obs = env.step(CloudAction(command="list_resources"))
>>> print(obs.output)
"""
# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
SUPPORTS_CONCURRENT_SESSIONS: bool = True
MAX_STEPS: int = 20
VALID_TASKS = {"easy", "medium", "hard"}
ACTION_COST: float = 0.01
def __init__(self, task_name: str = "easy"):
"""Initialize the cloud_devops_env environment."""
normalized_task = (task_name or "easy").lower()
if normalized_task not in self.VALID_TASKS:
raise ValueError(f"Unknown task: {task_name}")
self.task_name = normalized_task
self._state_data: CloudState | None = None
self._achievements: set[str] = set()
def _build_noise_resources(self) -> dict[str, dict[str, object]]:
"""Generate deterministic decoy resources to force retrieval and filtering."""
resources: dict[str, dict[str, object]] = {}
for i in range(1, 21):
suffix = f"{i:02d}"
resources[f"i-backend-{suffix}"] = {
"type": "Instance",
"status": "running",
"logs": (
"[2026-04-06 17:00:00] INFO node-exporter: "
"standard metrics reported successfully"
),
}
resources[f"sg-backend-{suffix}"] = {
"type": "SecurityGroup",
"rules": [{"port": 443, "action": "allow"}],
}
return resources
def _build_task_resources(self) -> dict[str, dict[str, object]]:
resources = self._build_noise_resources()
if self.task_name == "easy":
resources.update(
{
"i-web": {"type": "Instance", "status": "running"},
"sg-web": {
"type": "SecurityGroup",
"rules": [{"port": 22, "action": "allow"}],
},
}
)
return resources
if self.task_name == "medium":
resources.update(
{
"i-api": {
"type": "Instance",
"ip_address": "10.0.4.11",
"status": "running",
"logs": (
"[2026-04-06 17:01:22] [CRITICAL] "
"sqlalchemy.exc.OperationalError: "
"(psycopg2.OperationalError) connection to server at "
"'10.0.4.5', port 5432 failed: Connection timed out. "
"Is the server running and accepting TCP/IP connections?"
),
},
"i-db": {
"type": "Instance",
"ip_address": "10.0.4.5",
"status": "running",
},
"sg-db": {
"type": "SecurityGroup",
"rules": [{"port": 22, "action": "allow"}],
},
"metadata-svc": {
"type": "MetadataService",
"status": "running",
},
}
)
return resources
resources.update(
{
"lb-main": {
"type": "LoadBalancer",
"logs": (
"2026/04/06 17:02:09 [error] 3197#3197: *4189 upstream timed out "
"(110: Connection timed out) while reading response header from upstream, "
"client: 10.0.2.14, server: api.prod.local, request: \"GET /checkout HTTP/1.1\", "
"upstream: \"http://10.0.8.22:8080/checkout\", host: \"api.prod.local\"\n"
"2026/04/06 17:02:10 [error] 3197#3197: *4190 no live upstreams while "
"connecting to upstream \"10.0.8.22\""
),
},
"lb-external": {
"type": "LoadBalancer",
"status": "running",
"logs": "INFO: Edge traffic stable.",
},
"i-web1": {
"type": "Instance",
"ip_address": "10.0.8.21",
"status": "running",
"logs": (
"[2026-04-06 17:02:11] INFO web-service: readiness probe passed\n"
"[2026-04-06 17:02:12] INFO jvm: heap usage stable at 42%"
),
},
"i-web2": {
"type": "Instance",
"ip_address": "10.0.8.22",
"status": "degraded",
"logs": (
"kernel: Out of memory: Killed process 12345 (java) total-vm:4194304kB, "
"anon-rss:3145728kB\n"
"systemd[1]: web-service.service: Main process exited, code=killed, "
"status=9/KILL"
),
},
"sg-web": {
"type": "SecurityGroup",
"rules": [{"port": 80, "action": "allow"}],
},
"metadata-svc": {
"type": "MetadataService",
"status": "running",
},
}
)
return resources
def _lookup_resource_by_ip(self, ip_address: str) -> str | None:
if self._state_data is None:
return None
for resource_id, data in self._state_data.resources.items():
if data.get("ip_address") == ip_address:
return resource_id
return None
def _apply_cascading_failure(self) -> tuple[float, str]:
"""Simulate system drift in hard mode if root cause is not fixed quickly."""
if self._state_data is None or self.task_name != "hard":
return 0.0, ""
state = self._state_data
if state.is_resolved or state.step_count <= 8:
return 0.0, ""
lb = state.resources.get("lb-external")
if not lb:
return 0.0, ""
if lb.get("status") != "DOWN":
lb["status"] = "DOWN"
lb["logs"] = (
"CRITICAL: Cascading failure triggered after prolonged unresolved OOM incident. "
"Edge load balancer stopped serving traffic."
)
return -0.05, (
"\nALERT: Cascading failure detected. lb-external is DOWN due to delayed remediation."
)
return -0.03, ""
def _reward_once(self, achievement: str, points: float) -> float:
if achievement in self._achievements:
return 0.0
self._achievements.add(achievement)
return points
def _task_objective(self) -> str:
objectives = {
"easy": "Restore web access by allowing port 80 on sg-web.",
"medium": (
"Restore API to DB connectivity by reading i-api logs, resolving DB IP via "
"query_metadata, then allowing port 5432 on sg-db."
),
"hard": (
"Recover checkout path by tracing lb-main upstream IP, resolving it with "
"query_metadata, inspecting i-web2, and restarting i-web2 safely."
),
}
return objectives[self.task_name]
def reset(self) -> CloudObservation: # type: ignore[override]
"""Reset the environment to the initial state for the selected task."""
self._achievements.clear()
self._state_data = CloudState(
episode_id=str(uuid4()),
task_difficulty=self.task_name,
resources=copy.deepcopy(self._build_task_resources()),
step_count=0,
is_resolved=False,
)
return CloudObservation(
output=(
"Environment initialized. System status is currently CRITICAL. "
"Use 'list_resources' to begin triage."
),
error=None,
system_health_status="CRITICAL",
done=False,
reward=0.0,
metadata={
"step_count": 0,
"resolved": False,
"task": self.task_name,
"total_resources": len(self._state_data.resources),
"objective": self._task_objective(),
"deterministic": True,
"max_steps": self.MAX_STEPS,
"action_cost": self.ACTION_COST,
"hard_cascade_trigger_step": 8,
},
echoed_message="Cloud Devops Env environment ready!",
message_length=0,
)
def step(self, action: CloudAction) -> CloudObservation: # type: ignore[override]
"""Execute the agent action and return the next observation."""
if self._state_data is None:
self.reset()
assert self._state_data is not None
state = self._state_data
state.step_count += 1
reward = -self.ACTION_COST
reward_breakdown: list[dict[str, object]] = [
{"event": "action_cost", "delta": -self.ACTION_COST}
]
done = False
output = ""
error = None
termination_reason = "in_progress"
def add_reward(delta: float, event: str) -> None:
nonlocal reward
if abs(delta) < 1e-12:
return
reward += delta
reward_breakdown.append({"event": event, "delta": round(float(delta), 4)})
try:
if action.command == "list_resources":
res_list = [
f"{resource_id} ({data['type']})"
for resource_id, data in sorted(state.resources.items())
]
output = "Available Resources:\n" + "\n".join(res_list)
elif action.command == "describe_resource":
if not action.resource_id or action.resource_id not in state.resources:
raise ValueError(f"Resource {action.resource_id} not found.")
output = str(state.resources[action.resource_id])
if self.task_name == "easy" and action.resource_id == "sg-web":
add_reward(self._reward_once("read_sg", 0.2), "inspect_web_sg")
elif self.task_name == "medium" and action.resource_id == "sg-db":
add_reward(self._reward_once("read_sg", 0.2), "inspect_db_sg")
elif self.task_name == "hard" and action.resource_id == "i-web2":
add_reward(
self._reward_once("inspect_target", 0.2),
"inspect_target_instance",
)
elif action.command == "view_logs":
if not action.resource_id:
raise ValueError("resource_id is required for view_logs.")
res = state.resources.get(action.resource_id)
if not res:
raise ValueError(f"Resource {action.resource_id} not found.")
output = str(res.get("logs", "No logs available for this resource."))
if self.task_name == "medium" and action.resource_id == "i-api":
add_reward(self._reward_once("read_logs", 0.2), "inspect_api_logs")
elif self.task_name == "hard" and action.resource_id == "lb-main":
add_reward(self._reward_once("inspect_lb", 0.2), "inspect_lb_logs")
elif self.task_name == "hard" and action.resource_id == "i-web2":
add_reward(
self._reward_once("inspect_target", 0.2),
"inspect_target_logs",
)
elif action.command == "query_metadata":
ip_address = None
if action.parameters and isinstance(action.parameters, dict):
ip_address = action.parameters.get("ip_address")
if not ip_address and action.resource_id:
ip_address = action.resource_id
if not ip_address:
raise ValueError("query_metadata requires parameters.ip_address.")
resource_id = self._lookup_resource_by_ip(str(ip_address))
if not resource_id:
raise ValueError(f"No resource found for ip_address={ip_address}")
output = f"Metadata lookup: ip_address={ip_address} resource_id={resource_id}"
if self.task_name == "medium" and str(ip_address) == "10.0.4.5":
add_reward(
self._reward_once("lookup_db_target", 0.2),
"resolve_db_ip_dependency",
)
elif self.task_name == "hard" and str(ip_address) == "10.0.8.22":
add_reward(
self._reward_once("lookup_upstream_target", 0.2),
"resolve_upstream_ip_dependency",
)
elif action.command == "update_security_group":
if not action.resource_id:
raise ValueError("resource_id is required for update_security_group.")
res = state.resources.get(action.resource_id)
if not res or res.get("type") != "SecurityGroup":
raise ValueError(f"Invalid Security Group ID: {action.resource_id}")
if not action.parameters or "port" not in action.parameters:
raise ValueError("Missing 'port' in parameters.")
if "action" not in action.parameters:
raise ValueError("Missing 'action' in parameters. Use 'allow' or 'deny'.")
rule = copy.deepcopy(action.parameters)
rules = res.get("rules")
if not isinstance(rules, list):
raise ValueError(f"Security group {action.resource_id} has invalid rules.")
port = int(rule["port"])
rule_action = str(rule.get("action", "")).lower()
if rule_action not in {"allow", "deny"}:
raise ValueError(
"Invalid security-group action. Supported values: 'allow', 'deny'."
)
rules.append(rule)
output = f"Successfully updated {action.resource_id} with rule: {rule}"
if (
self.task_name == "easy"
and action.resource_id == "sg-web"
and port == 80
and rule_action == "allow"
):
state.is_resolved = True
add_reward(0.8, "resolve_easy_web_ingress")
done = True
termination_reason = "resolved_easy"
output += "\nSUCCESS: Web server is now accessible!"
elif (
self.task_name == "medium"
and action.resource_id == "sg-db"
and port == 5432
and rule_action == "allow"
):
investigated = (
"read_logs" in self._achievements
and "lookup_db_target" in self._achievements
)
if investigated:
state.is_resolved = True
add_reward(0.6, "resolve_medium_db_connectivity")
done = True
termination_reason = "resolved_medium"
output += "\nSUCCESS: Database connection restored!"
else:
add_reward(-0.1, "unsafe_change_without_triage")
output += (
"\nWARNING: Change applied without incident triage. "
"Inspect API logs and resolve DB IP via query_metadata before closing the incident."
)
elif rule_action == "deny":
add_reward(-0.1, "deny_rule_during_incident")
output += "\nWARNING: Deny rule applied during outage remediation."
elif action.command == "restart_service":
if not action.resource_id:
raise ValueError("resource_id is required for restart_service.")
if action.resource_id not in state.resources:
raise ValueError(f"Resource {action.resource_id} not found.")
output = f"Service on {action.resource_id} restarted."
if self.task_name == "hard":
if action.resource_id == "i-web2":
investigated_root_cause = (
"inspect_lb" in self._achievements
and "inspect_target" in self._achievements
and "lookup_upstream_target" in self._achievements
)
if investigated_root_cause:
state.resources["i-web2"]["status"] = "running"
state.resources["i-web2"][
"logs"
] = "INFO: Restart successful. Memory cleared."
state.is_resolved = True
add_reward(0.8, "resolve_hard_upstream_recovery")
done = True
termination_reason = "resolved_hard"
output += "\nSUCCESS: OutOfMemory loop broken. System stable."
else:
add_reward(-0.1, "restart_without_root_cause")
output += (
"\nWARNING: Restart denied by change policy. "
"Find failing upstream IP from lb-main, resolve it with query_metadata, and inspect i-web2 first."
)
elif action.resource_id == "i-web1":
add_reward(-0.2, "restart_healthy_node")
output += (
"\nWARNING: You restarted a healthy production server! "
"Users dropped."
)
elif action.command == "submit_solution":
if state.is_resolved:
done = True
termination_reason = "resolved_submit_solution"
output = "Solution verified. System is HEALTHY."
else:
if self.task_name == "hard":
# In hard mode, unresolved submission should not abort the run.
done = False
add_reward(-0.1, "premature_submit_hard")
output = (
"Solution incorrect. Incident is still CRITICAL. "
"Continue triage and remediation before submitting."
)
else:
done = True
termination_reason = "incorrect_submit"
output = "Solution incorrect. System is still CRITICAL."
else:
raise ValueError(f"Unsupported command: {action.command}")
except Exception as exc:
error = str(exc)
output = f"Command Failed: {error}"
cascade_penalty, cascade_msg = self._apply_cascading_failure()
add_reward(cascade_penalty, "cascading_failure_penalty")
if cascade_msg:
output = f"{output}{cascade_msg}" if output else cascade_msg.strip()
if state.step_count >= self.MAX_STEPS and not done:
done = True
termination_reason = "max_steps_timeout"
timeout_suffix = "\nTIMEOUT: Max steps reached."
output = f"{output}{timeout_suffix}" if output else timeout_suffix.strip()
raw_reward = reward
reward = max(-1.0, min(1.0, reward))
if reward != raw_reward:
reward_breakdown.append(
{
"event": "reward_clip",
"delta": round(float(reward - raw_reward), 4),
}
)
lb_external = state.resources.get("lb-external", {})
if state.is_resolved:
status = "HEALTHY"
elif self.task_name == "hard" and lb_external.get("status") == "DOWN":
status = "DEGRADED"
else:
status = "CRITICAL"
info = {
"step_count": state.step_count,
"resolved": state.is_resolved,
"task": self.task_name,
"achievements": sorted(self._achievements),
"total_resources": len(state.resources),
"action_cost": self.ACTION_COST,
"objective": self._task_objective(),
"deterministic": True,
"max_steps": self.MAX_STEPS,
"termination_reason": termination_reason if done else "in_progress",
"reward_breakdown": reward_breakdown,
}
return CloudObservation(
output=output,
error=error,
system_health_status=status,
done=done,
reward=reward,
metadata=info,
echoed_message=output,
message_length=len(output),
)
@property
def state(self) -> State:
"""Return hidden environment state for evaluators/debugging."""
if self._state_data is None:
self.reset()
assert self._state_data is not None
return self._state_data