File size: 22,956 Bytes
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d35c04a
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d35c04a
39ff394
 
 
 
 
d35c04a
39ff394
 
 
d35c04a
 
 
 
 
39ff394
 
 
 
d35c04a
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
d35c04a
39ff394
d35c04a
39ff394
 
d35c04a
 
 
 
 
39ff394
 
d35c04a
39ff394
 
 
 
 
 
 
 
d35c04a
39ff394
 
 
 
 
 
 
 
 
 
 
 
d35c04a
 
 
 
39ff394
 
 
 
d35c04a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39ff394
 
 
 
 
 
f97f9a6
 
 
 
 
 
 
 
 
 
 
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f97f9a6
 
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
d35c04a
f97f9a6
 
 
39ff394
 
 
f97f9a6
 
 
 
 
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f97f9a6
39ff394
f97f9a6
39ff394
f97f9a6
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
f97f9a6
39ff394
f97f9a6
39ff394
f97f9a6
 
 
 
39ff394
d35c04a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f97f9a6
 
 
 
d35c04a
f97f9a6
 
 
 
d35c04a
39ff394
 
 
 
 
 
 
 
 
4d9ede6
 
39ff394
 
 
 
 
4d9ede6
 
 
 
 
 
 
39ff394
 
 
 
 
 
4d9ede6
39ff394
 
f97f9a6
39ff394
f97f9a6
39ff394
 
 
 
 
4d9ede6
39ff394
d35c04a
 
 
 
 
39ff394
f97f9a6
39ff394
f97f9a6
39ff394
 
f97f9a6
39ff394
 
d35c04a
39ff394
4d9ede6
f97f9a6
4d9ede6
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
d35c04a
39ff394
 
 
 
 
 
 
f97f9a6
39ff394
f97f9a6
39ff394
 
f97f9a6
39ff394
 
d35c04a
39ff394
 
f97f9a6
39ff394
 
 
 
 
 
 
 
f97f9a6
39ff394
 
 
 
 
f97f9a6
39ff394
 
 
 
 
 
f97f9a6
39ff394
 
 
 
 
 
 
 
 
d35c04a
f97f9a6
d35c04a
 
 
39ff394
 
f97f9a6
39ff394
 
 
f97f9a6
39ff394
f97f9a6
 
 
 
 
 
 
 
d35c04a
 
 
 
 
 
 
39ff394
 
 
 
 
 
d35c04a
f97f9a6
 
 
 
 
39ff394
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Cloud Devops Env Environment Implementation.

A deterministic mock cloud/devops environment with reward shaping and
anti-farming guardrails for hackathon evaluation.
"""

from __future__ import annotations

import copy
from uuid import uuid4

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State

try:
    from ..models import CloudAction, CloudObservation, CloudState
except ImportError:
    from models import CloudAction, CloudObservation, CloudState


class CloudDevopsEnvironment(Environment):
    """
    A deterministic mock cloud/devops environment.

    Tasks:
    - easy: open port 80 on sg-web
    - medium: inspect noisy API logs, then open port 5432 on sg-db
    - hard: trace 502 from lb-main to i-web2, then restart i-web2 (not i-web1)

    Example:
        >>> env = CloudDevopsEnvironment()
        >>> obs = env.reset()
        >>> print(obs.system_health_status)  # "CRITICAL"
        >>>
        >>> obs = env.step(CloudAction(command="list_resources"))
        >>> print(obs.output)
    """

    # Enable concurrent WebSocket sessions.
    # Set to True if your environment isolates state between instances.
    # When True, multiple WebSocket clients can connect simultaneously, each
    # getting their own environment instance (when using factory mode in app.py).
    SUPPORTS_CONCURRENT_SESSIONS: bool = True
    MAX_STEPS: int = 20
    VALID_TASKS = {"easy", "medium", "hard"}
    ACTION_COST: float = 0.01

    def __init__(self, task_name: str = "easy"):
        """Initialize the cloud_devops_env environment."""
        normalized_task = (task_name or "easy").lower()
        if normalized_task not in self.VALID_TASKS:
            raise ValueError(f"Unknown task: {task_name}")

        self.task_name = normalized_task
        self._state_data: CloudState | None = None
        self._achievements: set[str] = set()

    def _build_noise_resources(self) -> dict[str, dict[str, object]]:
        """Generate deterministic decoy resources to force retrieval and filtering."""
        resources: dict[str, dict[str, object]] = {}
        for i in range(1, 21):
            suffix = f"{i:02d}"
            resources[f"i-backend-{suffix}"] = {
                "type": "Instance",
                "status": "running",
                "logs": (
                    "[2026-04-06 17:00:00] INFO node-exporter: "
                    "standard metrics reported successfully"
                ),
            }
            resources[f"sg-backend-{suffix}"] = {
                "type": "SecurityGroup",
                "rules": [{"port": 443, "action": "allow"}],
            }
        return resources

    def _build_task_resources(self) -> dict[str, dict[str, object]]:
        resources = self._build_noise_resources()

        if self.task_name == "easy":
            resources.update(
                {
                "i-web": {"type": "Instance", "status": "running"},
                "sg-web": {
                    "type": "SecurityGroup",
                    "rules": [{"port": 22, "action": "allow"}],
                },
                }
            )
            return resources

        if self.task_name == "medium":
            resources.update(
                {
                "i-api": {
                    "type": "Instance",
                    "ip_address": "10.0.4.11",
                    "status": "running",
                    "logs": (
                        "[2026-04-06 17:01:22] [CRITICAL] "
                        "sqlalchemy.exc.OperationalError: "
                        "(psycopg2.OperationalError) connection to server at "
                        "'10.0.4.5', port 5432 failed: Connection timed out. "
                        "Is the server running and accepting TCP/IP connections?"
                    ),
                },
                "i-db": {
                    "type": "Instance",
                    "ip_address": "10.0.4.5",
                    "status": "running",
                },
                "sg-db": {
                    "type": "SecurityGroup",
                    "rules": [{"port": 22, "action": "allow"}],
                },
                "metadata-svc": {
                    "type": "MetadataService",
                    "status": "running",
                },
                }
            )
            return resources

        resources.update(
            {
            "lb-main": {
                "type": "LoadBalancer",
                "logs": (
                    "2026/04/06 17:02:09 [error] 3197#3197: *4189 upstream timed out "
                    "(110: Connection timed out) while reading response header from upstream, "
                    "client: 10.0.2.14, server: api.prod.local, request: \"GET /checkout HTTP/1.1\", "
                    "upstream: \"http://10.0.8.22:8080/checkout\", host: \"api.prod.local\"\n"
                    "2026/04/06 17:02:10 [error] 3197#3197: *4190 no live upstreams while "
                    "connecting to upstream \"10.0.8.22\""
                ),
            },
            "lb-external": {
                "type": "LoadBalancer",
                "status": "running",
                "logs": "INFO: Edge traffic stable.",
            },
            "i-web1": {
                "type": "Instance",
                "ip_address": "10.0.8.21",
                "status": "running",
                "logs": (
                    "[2026-04-06 17:02:11] INFO web-service: readiness probe passed\n"
                    "[2026-04-06 17:02:12] INFO jvm: heap usage stable at 42%"
                ),
            },
            "i-web2": {
                "type": "Instance",
                "ip_address": "10.0.8.22",
                "status": "degraded",
                "logs": (
                    "kernel: Out of memory: Killed process 12345 (java) total-vm:4194304kB, "
                    "anon-rss:3145728kB\n"
                    "systemd[1]: web-service.service: Main process exited, code=killed, "
                    "status=9/KILL"
                ),
            },
            "sg-web": {
                "type": "SecurityGroup",
                "rules": [{"port": 80, "action": "allow"}],
            },
            "metadata-svc": {
                "type": "MetadataService",
                "status": "running",
            },
            }
        )
        return resources

    def _lookup_resource_by_ip(self, ip_address: str) -> str | None:
        if self._state_data is None:
            return None
        for resource_id, data in self._state_data.resources.items():
            if data.get("ip_address") == ip_address:
                return resource_id
        return None

    def _apply_cascading_failure(self) -> tuple[float, str]:
        """Simulate system drift in hard mode if root cause is not fixed quickly."""
        if self._state_data is None or self.task_name != "hard":
            return 0.0, ""

        state = self._state_data
        if state.is_resolved or state.step_count <= 8:
            return 0.0, ""

        lb = state.resources.get("lb-external")
        if not lb:
            return 0.0, ""

        if lb.get("status") != "DOWN":
            lb["status"] = "DOWN"
            lb["logs"] = (
                "CRITICAL: Cascading failure triggered after prolonged unresolved OOM incident. "
                "Edge load balancer stopped serving traffic."
            )
            return -0.05, (
                "\nALERT: Cascading failure detected. lb-external is DOWN due to delayed remediation."
            )

        return -0.03, ""

    def _reward_once(self, achievement: str, points: float) -> float:
        if achievement in self._achievements:
            return 0.0
        self._achievements.add(achievement)
        return points

    def _task_objective(self) -> str:
        objectives = {
            "easy": "Restore web access by allowing port 80 on sg-web.",
            "medium": (
                "Restore API to DB connectivity by reading i-api logs, resolving DB IP via "
                "query_metadata, then allowing port 5432 on sg-db."
            ),
            "hard": (
                "Recover checkout path by tracing lb-main upstream IP, resolving it with "
                "query_metadata, inspecting i-web2, and restarting i-web2 safely."
            ),
        }
        return objectives[self.task_name]

    def reset(self) -> CloudObservation:  # type: ignore[override]
        """Reset the environment to the initial state for the selected task."""
        self._achievements.clear()
        self._state_data = CloudState(
            episode_id=str(uuid4()),
            task_difficulty=self.task_name,
            resources=copy.deepcopy(self._build_task_resources()),
            step_count=0,
            is_resolved=False,
        )

        return CloudObservation(
            output=(
                "Environment initialized. System status is currently CRITICAL. "
                "Use 'list_resources' to begin triage."
            ),
            error=None,
            system_health_status="CRITICAL",
            done=False,
            reward=0.0,
            metadata={
                "step_count": 0,
                "resolved": False,
                "task": self.task_name,
                "total_resources": len(self._state_data.resources),
                "objective": self._task_objective(),
                "deterministic": True,
                "max_steps": self.MAX_STEPS,
                "action_cost": self.ACTION_COST,
                "hard_cascade_trigger_step": 8,
            },
            echoed_message="Cloud Devops Env environment ready!",
            message_length=0,
        )

    def step(self, action: CloudAction) -> CloudObservation:  # type: ignore[override]
        """Execute the agent action and return the next observation."""
        if self._state_data is None:
            self.reset()

        assert self._state_data is not None
        state = self._state_data

        state.step_count += 1
        reward = -self.ACTION_COST
        reward_breakdown: list[dict[str, object]] = [
            {"event": "action_cost", "delta": -self.ACTION_COST}
        ]
        done = False
        output = ""
        error = None
        termination_reason = "in_progress"

        def add_reward(delta: float, event: str) -> None:
            nonlocal reward
            if abs(delta) < 1e-12:
                return
            reward += delta
            reward_breakdown.append({"event": event, "delta": round(float(delta), 4)})

        try:
            if action.command == "list_resources":
                res_list = [
                    f"{resource_id} ({data['type']})"
                    for resource_id, data in sorted(state.resources.items())
                ]
                output = "Available Resources:\n" + "\n".join(res_list)

            elif action.command == "describe_resource":
                if not action.resource_id or action.resource_id not in state.resources:
                    raise ValueError(f"Resource {action.resource_id} not found.")

                output = str(state.resources[action.resource_id])

                if self.task_name == "easy" and action.resource_id == "sg-web":
                    add_reward(self._reward_once("read_sg", 0.2), "inspect_web_sg")
                elif self.task_name == "medium" and action.resource_id == "sg-db":
                    add_reward(self._reward_once("read_sg", 0.2), "inspect_db_sg")
                elif self.task_name == "hard" and action.resource_id == "i-web2":
                    add_reward(
                        self._reward_once("inspect_target", 0.2),
                        "inspect_target_instance",
                    )

            elif action.command == "view_logs":
                if not action.resource_id:
                    raise ValueError("resource_id is required for view_logs.")

                res = state.resources.get(action.resource_id)
                if not res:
                    raise ValueError(f"Resource {action.resource_id} not found.")

                output = str(res.get("logs", "No logs available for this resource."))

                if self.task_name == "medium" and action.resource_id == "i-api":
                    add_reward(self._reward_once("read_logs", 0.2), "inspect_api_logs")
                elif self.task_name == "hard" and action.resource_id == "lb-main":
                    add_reward(self._reward_once("inspect_lb", 0.2), "inspect_lb_logs")
                elif self.task_name == "hard" and action.resource_id == "i-web2":
                    add_reward(
                        self._reward_once("inspect_target", 0.2),
                        "inspect_target_logs",
                    )

            elif action.command == "query_metadata":
                ip_address = None
                if action.parameters and isinstance(action.parameters, dict):
                    ip_address = action.parameters.get("ip_address")
                if not ip_address and action.resource_id:
                    ip_address = action.resource_id
                if not ip_address:
                    raise ValueError("query_metadata requires parameters.ip_address.")

                resource_id = self._lookup_resource_by_ip(str(ip_address))
                if not resource_id:
                    raise ValueError(f"No resource found for ip_address={ip_address}")

                output = f"Metadata lookup: ip_address={ip_address} resource_id={resource_id}"
                if self.task_name == "medium" and str(ip_address) == "10.0.4.5":
                    add_reward(
                        self._reward_once("lookup_db_target", 0.2),
                        "resolve_db_ip_dependency",
                    )
                elif self.task_name == "hard" and str(ip_address) == "10.0.8.22":
                    add_reward(
                        self._reward_once("lookup_upstream_target", 0.2),
                        "resolve_upstream_ip_dependency",
                    )

            elif action.command == "update_security_group":
                if not action.resource_id:
                    raise ValueError("resource_id is required for update_security_group.")

                res = state.resources.get(action.resource_id)
                if not res or res.get("type") != "SecurityGroup":
                    raise ValueError(f"Invalid Security Group ID: {action.resource_id}")
                if not action.parameters or "port" not in action.parameters:
                    raise ValueError("Missing 'port' in parameters.")
                if "action" not in action.parameters:
                    raise ValueError("Missing 'action' in parameters. Use 'allow' or 'deny'.")

                rule = copy.deepcopy(action.parameters)
                rules = res.get("rules")
                if not isinstance(rules, list):
                    raise ValueError(f"Security group {action.resource_id} has invalid rules.")
                port = int(rule["port"])
                rule_action = str(rule.get("action", "")).lower()
                if rule_action not in {"allow", "deny"}:
                    raise ValueError(
                        "Invalid security-group action. Supported values: 'allow', 'deny'."
                    )

                rules.append(rule)
                output = f"Successfully updated {action.resource_id} with rule: {rule}"
                if (
                    self.task_name == "easy"
                    and action.resource_id == "sg-web"
                    and port == 80
                    and rule_action == "allow"
                ):
                    state.is_resolved = True
                    add_reward(0.8, "resolve_easy_web_ingress")
                    done = True
                    termination_reason = "resolved_easy"
                    output += "\nSUCCESS: Web server is now accessible!"
                elif (
                    self.task_name == "medium"
                    and action.resource_id == "sg-db"
                    and port == 5432
                    and rule_action == "allow"
                ):
                    investigated = (
                        "read_logs" in self._achievements
                        and "lookup_db_target" in self._achievements
                    )
                    if investigated:
                        state.is_resolved = True
                        add_reward(0.6, "resolve_medium_db_connectivity")
                        done = True
                        termination_reason = "resolved_medium"
                        output += "\nSUCCESS: Database connection restored!"
                    else:
                        add_reward(-0.1, "unsafe_change_without_triage")
                        output += (
                            "\nWARNING: Change applied without incident triage. "
                            "Inspect API logs and resolve DB IP via query_metadata before closing the incident."
                        )
                elif rule_action == "deny":
                    add_reward(-0.1, "deny_rule_during_incident")
                    output += "\nWARNING: Deny rule applied during outage remediation."

            elif action.command == "restart_service":
                if not action.resource_id:
                    raise ValueError("resource_id is required for restart_service.")
                if action.resource_id not in state.resources:
                    raise ValueError(f"Resource {action.resource_id} not found.")

                output = f"Service on {action.resource_id} restarted."

                if self.task_name == "hard":
                    if action.resource_id == "i-web2":
                        investigated_root_cause = (
                            "inspect_lb" in self._achievements
                            and "inspect_target" in self._achievements
                            and "lookup_upstream_target" in self._achievements
                        )
                        if investigated_root_cause:
                            state.resources["i-web2"]["status"] = "running"
                            state.resources["i-web2"][
                                "logs"
                            ] = "INFO: Restart successful. Memory cleared."
                            state.is_resolved = True
                            add_reward(0.8, "resolve_hard_upstream_recovery")
                            done = True
                            termination_reason = "resolved_hard"
                            output += "\nSUCCESS: OutOfMemory loop broken. System stable."
                        else:
                            add_reward(-0.1, "restart_without_root_cause")
                            output += (
                                "\nWARNING: Restart denied by change policy. "
                                "Find failing upstream IP from lb-main, resolve it with query_metadata, and inspect i-web2 first."
                            )
                    elif action.resource_id == "i-web1":
                        add_reward(-0.2, "restart_healthy_node")
                        output += (
                            "\nWARNING: You restarted a healthy production server! "
                            "Users dropped."
                        )

            elif action.command == "submit_solution":
                if state.is_resolved:
                    done = True
                    termination_reason = "resolved_submit_solution"
                    output = "Solution verified. System is HEALTHY."
                else:
                    if self.task_name == "hard":
                        # In hard mode, unresolved submission should not abort the run.
                        done = False
                        add_reward(-0.1, "premature_submit_hard")
                        output = (
                            "Solution incorrect. Incident is still CRITICAL. "
                            "Continue triage and remediation before submitting."
                        )
                    else:
                        done = True
                        termination_reason = "incorrect_submit"
                        output = "Solution incorrect. System is still CRITICAL."

            else:
                raise ValueError(f"Unsupported command: {action.command}")

        except Exception as exc:
            error = str(exc)
            output = f"Command Failed: {error}"

        cascade_penalty, cascade_msg = self._apply_cascading_failure()
        add_reward(cascade_penalty, "cascading_failure_penalty")
        if cascade_msg:
            output = f"{output}{cascade_msg}" if output else cascade_msg.strip()

        if state.step_count >= self.MAX_STEPS and not done:
            done = True
            termination_reason = "max_steps_timeout"
            timeout_suffix = "\nTIMEOUT: Max steps reached."
            output = f"{output}{timeout_suffix}" if output else timeout_suffix.strip()

        raw_reward = reward
        reward = max(-1.0, min(1.0, reward))
        if reward != raw_reward:
            reward_breakdown.append(
                {
                    "event": "reward_clip",
                    "delta": round(float(reward - raw_reward), 4),
                }
            )

        lb_external = state.resources.get("lb-external", {})
        if state.is_resolved:
            status = "HEALTHY"
        elif self.task_name == "hard" and lb_external.get("status") == "DOWN":
            status = "DEGRADED"
        else:
            status = "CRITICAL"
        info = {
            "step_count": state.step_count,
            "resolved": state.is_resolved,
            "task": self.task_name,
            "achievements": sorted(self._achievements),
            "total_resources": len(state.resources),
            "action_cost": self.ACTION_COST,
            "objective": self._task_objective(),
            "deterministic": True,
            "max_steps": self.MAX_STEPS,
            "termination_reason": termination_reason if done else "in_progress",
            "reward_breakdown": reward_breakdown,
        }

        return CloudObservation(
            output=output,
            error=error,
            system_health_status=status,
            done=done,
            reward=reward,
            metadata=info,
            echoed_message=output,
            message_length=len(output),
        )

    @property
    def state(self) -> State:
        """Return hidden environment state for evaluators/debugging."""
        if self._state_data is None:
            self.reset()
        assert self._state_data is not None
        return self._state_data