From b617d4dd291343b040f88b82838a71e03e15b0c9 Mon Sep 17 00:00:00 2001 From: pradeepvrd Date: Wed, 17 Jun 2026 23:42:14 -0700 Subject: [PATCH 1/3] feat(chaos): restructure chaos injector into agent/fault split with ABCs and registries (2c) Modules moved/refactored: - pkg/agents/chaos/chaos.py -> devops_bench/chaos/agent.py (ChaosAgent loop) + devops_bench/chaos/faults/generate_load.py (fault exec) - new devops_bench/chaos/base.py (Fault/Trigger ABCs + FAULTS/TRIGGERS registries) - new devops_bench/chaos/__init__.py + devops_bench/chaos/faults/__init__.py (light re-exports; no SDK imports) - new tests/unit/chaos/test_chaos_agent.py + test_chaos_generate_load.py (legacy chaos_test.py ported to pytest) Bugs fixed vs legacy: - none (pure structural move; behavioral fixes land in the following fix(chaos) commit) Improvements vs legacy: - split the monolithic ChaosAgent into an orchestration layer (agent.py) and a registered fault (faults/generate_load.py), so faults are pluggable - added Fault/Trigger ABCs and the FAULTS/TRIGGERS registries (base.py) per the component design, replacing ad-hoc dispatch on action "type" - made the LLM loop model-agnostic: drive it through the neutral devops_bench.models LLMClient interface (get_model + format_tools/generate_content/extract_function_calls/get_text_content) instead of the hardcoded google.genai chat client, with provider/model from CHAOS_PROVIDER/CHAOS_MODEL falling back to AGENT_PROVIDER/AGENT_MODEL - preserved the chaos_active_event signaling so the harness can detect an active load spike - exposed command execution as a single run_command tool and bounded the loop with a turn cap --- devops_bench/chaos/__init__.py | 28 ++++ devops_bench/chaos/agent.py | 168 +++++++++++++++++++ devops_bench/chaos/base.py | 103 ++++++++++++ devops_bench/chaos/faults/__init__.py | 19 +++ devops_bench/chaos/faults/generate_load.py | 153 +++++++++++++++++ tests/unit/chaos/test_chaos_agent.py | 108 ++++++++++++ tests/unit/chaos/test_chaos_generate_load.py | 141 ++++++++++++++++ 7 files changed, 720 insertions(+) create mode 100644 devops_bench/chaos/__init__.py create mode 100644 devops_bench/chaos/agent.py create mode 100644 devops_bench/chaos/base.py create mode 100644 devops_bench/chaos/faults/__init__.py create mode 100644 devops_bench/chaos/faults/generate_load.py create mode 100644 tests/unit/chaos/test_chaos_agent.py create mode 100644 tests/unit/chaos/test_chaos_generate_load.py diff --git a/devops_bench/chaos/__init__.py b/devops_bench/chaos/__init__.py new file mode 100644 index 00000000..b2465941 --- /dev/null +++ b/devops_bench/chaos/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Chaos injection: fault/trigger interfaces, registries, and the agent loop. + +Importing this package is light: provider SDKs are loaded lazily by the models +layer only when :class:`ChaosAgent` constructs a client. Concrete faults are +imported here so they self-register in :data:`FAULTS`. +""" + +from __future__ import annotations + +from devops_bench.chaos.agent import ChaosAgent +from devops_bench.chaos.base import FAULTS, TRIGGERS, Fault, Trigger +from devops_bench.chaos.faults import generate_load # noqa: F401 - registers the fault + +__all__ = ["ChaosAgent", "Fault", "Trigger", "FAULTS", "TRIGGERS"] diff --git a/devops_bench/chaos/agent.py b/devops_bench/chaos/agent.py new file mode 100644 index 00000000..7c1157fa --- /dev/null +++ b/devops_bench/chaos/agent.py @@ -0,0 +1,168 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""LLM-driven orchestration loop for injecting chaos faults.""" + +from __future__ import annotations + +import asyncio +import threading +from types import SimpleNamespace +from typing import Any + +from devops_bench.chaos.faults.generate_load import run_chaos_command +from devops_bench.core import first_env, get_logger +from devops_bench.models import LLMClient, get_model + +__all__ = ["ChaosAgent", "SYSTEM_INSTRUCTION", "RUN_COMMAND_TOOL"] + +_log = get_logger("chaos.agent") + +SYSTEM_INSTRUCTION = ( + "You are a professional Site Reliability Engineer (SRE) and Chaos Engineering Expert.\n" + "Your role is to disrupt GKE workloads to test system resilience, which can happen in " + "two modes:\n" + "1. Planned Mode: Execute a specific GKE chaos disruption according to a provided JSON spec.\n" + "2. Autonomous Mode: Autonomously explore the GKE cluster state, identify critical targets " + "(pods, nodes, services), and inject transient faults to test recovery.\n\n" + "You are equipped with the `run_command` tool, which runs shell commands locally on the GKE " + "host control machine (which is fully authenticated and has GKE admin kubectl privileges).\n\n" + "Strict Guidelines for Execution:\n" + "- Single Execution Policy: You MUST execute exactly one tool call to run the planned " + "'fortio' load generation spike. Do NOT attempt to rerun, adjust, or tune the load " + "generation if the target service saturates or returns timeouts. Once the single load " + "command is executed, analyze the output, write your final performance summary, and exit " + "immediately.\n" + "- Safety First: Only inject transient, safe, and recoverable faults (e.g. killing pods, " + "scaling deployments, generating traffic spikes). Do NOT permanently destroy GKE clusters, " + "namespaces, or nodes.\n" + "- Traffic Generation: For load spikes, use the 'fortio' binary. Since GKE internal service " + "URLs (*.svc.cluster.local) are port-forwarded to the host, you MUST target " + "'http://localhost:8080' instead.\n" + "- Analysis & Clarity: Analyze command outputs carefully, report stdout/stderr accurately, " + "and confirm in your final response when the disruption has been successfully completed." +) + +# Neutral, duck-typed tool descriptor consumed by ``LLMClient.format_tools`` +# (mirrors the MCP tool shape: name/description/inputSchema). +RUN_COMMAND_TOOL = SimpleNamespace( + name="run_command", + description=( + "Run a shell command on the GKE host control machine (authenticated kubectl + fortio). " + "Returns combined stdout and stderr." + ), + inputSchema={ + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The shell command to execute, e.g. a fortio load invocation.", + } + }, + "required": ["command"], + }, +) + +# Safety bound on the agent loop so a misbehaving model cannot spin forever. +_MAX_TURNS = 8 + + +class ChaosAgent: + """Drives an LLM through a tool-calling loop to inject chaos faults. + + The agent is provider-agnostic: it obtains an :class:`LLMClient` from the + models layer and never imports a provider SDK. The model is given a single + ``run_command`` tool and loops until it stops requesting tool calls. + + Args: + chaos_active_event: Optional event signaled when a load spike starts, + so the harness can coordinate measurements. + client: Optional pre-built LLM client; when omitted one is selected + from configuration via :func:`get_model`. + """ + + def __init__( + self, + chaos_active_event: threading.Event | None = None, + client: LLMClient | None = None, + ) -> None: + self._chaos_active_event = chaos_active_event + if client is None: + provider = first_env("CHAOS_PROVIDER", "AGENT_PROVIDER") + model_name = first_env("CHAOS_MODEL", "AGENT_MODEL") + client = get_model(provider=provider, model_name=model_name) + self._client = client + + def run(self, goal: str) -> str: + """Run the chaos loop synchronously and return the model's final text. + + Args: + goal: The planned-mode goal prompt for the model. + + Returns: + The model's final text response once it stops calling tools. + """ + return asyncio.run(self._run_async(goal)) + + async def _run_async(self, goal: str) -> str: + client = self._client + tools = client.format_tools([RUN_COMMAND_TOOL]) + contents: list[dict[str, Any]] = [{"role": "user", "content": goal}] + + final_text = "" + for turn in range(_MAX_TURNS): + _log.info("chaos agent turn %d", turn + 1) + response = await client.generate_content(contents, tools, SYSTEM_INSTRUCTION) + text = client.get_text_content(response) + function_calls = client.extract_function_calls(response) + + assistant_message: dict[str, Any] = {"role": "assistant", "content": text} + if function_calls: + assistant_message["tool_calls"] = function_calls + contents.append(assistant_message) + + if not function_calls: + final_text = text + _log.info("chaos agent finished: no further tool calls") + break + + for call in function_calls: + result = self._execute_tool(call.get("name"), call.get("args") or {}) + contents.append( + { + "role": "tool", + "tool_call_id": call.get("id"), + "name": call.get("name"), + "content": result, + } + ) + else: + _log.warning("chaos agent stopped after reaching the turn limit (%d)", _MAX_TURNS) + + return final_text + + def _execute_tool(self, name: str | None, args: dict[str, Any]) -> str: + """Dispatch a model tool call to its implementation. + + Args: + name: Requested tool name. + args: Tool arguments from the model. + + Returns: + The tool's textual result, or an error description. + """ + if name == RUN_COMMAND_TOOL.name: + command = args.get("command", "") + return run_chaos_command(command, self._chaos_active_event) + return f"Error: unknown tool {name!r}" diff --git a/devops_bench/chaos/base.py b/devops_bench/chaos/base.py new file mode 100644 index 00000000..b12d50b7 --- /dev/null +++ b/devops_bench/chaos/base.py @@ -0,0 +1,103 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Chaos fault/trigger interfaces and their selection registries.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +from devops_bench.core import Registry + +__all__ = ["Fault", "Trigger", "FAULTS", "TRIGGERS"] + +FAULTS: Registry[type[Fault]] = Registry("faults") +TRIGGERS: Registry[type[Trigger]] = Registry("triggers") + + +class Fault(ABC): + """Abstract base for a platform-agnostic disruption or failure state. + + A fault describes *what* to disrupt independent of the target platform. + Concrete faults live in sibling modules under ``chaos.faults`` and + self-register under a canonical key via ``@FAULTS.register(...)``. + + Attributes: + id: Stable identifier for this fault instance. + name: Human-readable name. + target_subsystem: Subsystem the fault targets (e.g. ``"network"``). + """ + + id: str + name: str + target_subsystem: str + + @abstractmethod + def get_agnostic_spec(self) -> dict[str, Any]: + """Return the standardized, platform-agnostic parameters of the fault. + + Returns: + A JSON-serializable dict describing the disruption. + """ + + @abstractmethod + def inject(self, spec: dict[str, Any], context: dict[str, Any] | None = None) -> dict[str, Any]: + """Inject the fault into the target platform. + + Args: + spec: Platform-agnostic fault spec (the chaos task definition). + context: Optional execution context (signaling events, runtime + params) forwarded by the caller. + + Returns: + A JSON-serializable report describing the outcome. + """ + + +class Trigger(ABC): + """Abstract base for the condition that decides when a fault should fire. + + A trigger evaluates platform-agnostic state (provided by a verifier or + monitoring source) and lives outside any chaos infrastructure. Concrete + triggers self-register under a canonical key via ``@TRIGGERS.register(...)``. + + Attributes: + id: Stable identifier for this trigger instance. + name: Human-readable name. + trigger_type: Discriminator describing the trigger heuristic. + """ + + id: str + name: str + trigger_type: str + + def initialize(self, context: dict[str, Any]) -> None: + """Initialize trigger state (e.g. baselines or internal timers). + + Args: + context: Platform-agnostic context used to seed the trigger. + """ + return None + + @abstractmethod + def is_triggered(self, current_platform_state: dict[str, Any]) -> bool: + """Evaluate state to decide whether the fault should be injected. + + Args: + current_platform_state: Platform-agnostic state snapshot. + + Returns: + True when the fault should fire. + """ diff --git a/devops_bench/chaos/faults/__init__.py b/devops_bench/chaos/faults/__init__.py new file mode 100644 index 00000000..f6f8fd87 --- /dev/null +++ b/devops_bench/chaos/faults/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Concrete chaos faults, each self-registering in ``chaos.base.FAULTS``.""" + +from __future__ import annotations + +__all__: list[str] = [] diff --git a/devops_bench/chaos/faults/generate_load.py b/devops_bench/chaos/faults/generate_load.py new file mode 100644 index 00000000..98d7fcd8 --- /dev/null +++ b/devops_bench/chaos/faults/generate_load.py @@ -0,0 +1,153 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The ``generate_load`` fault: LLM-driven fortio/kubectl traffic spikes.""" + +from __future__ import annotations + +import json +import shlex +import threading +from typing import Any + +from devops_bench.chaos.base import FAULTS, Fault +from devops_bench.core import get_logger +from devops_bench.core.subprocess import run + +__all__ = ["GenerateLoadFault", "run_chaos_command"] + +_log = get_logger("chaos.generate_load") + +# Marker substring that, when present in a command, indicates a load spike is +# active. The harness watches the shared event to coordinate measurements. +_LOAD_MARKER = "fortio load" + +# Wall-clock ceiling for a single chaos command, matching legacy behavior. +_COMMAND_TIMEOUT = 40 + + +def run_chaos_command( + command: str, + chaos_active_event: threading.Event | None = None, +) -> str: + """Execute a single chaos shell command and return its combined output. + + The command is a free-form shell string produced by the LLM (e.g. a + ``fortio load`` invocation). When the command is a load spike and an event + is supplied, the event is set so the harness can observe that load is + active before measuring impact. + + Args: + command: Shell command to execute. + chaos_active_event: Optional event signaled when a load spike starts. + + Returns: + A string combining stdout and stderr, or an ``Error:`` description if + the command could not be run. + """ + try: + _log.info("running chaos command: %s", command) + + if chaos_active_event is not None and _LOAD_MARKER in command: + _log.info("load spike detected; signaling harness via chaos event") + chaos_active_event.set() + + # The model emits shell strings (fortio pipelines, kubectl exec); split + # into argv so execution stays shell-free. Commands that genuinely need + # shell features would have to be handled explicitly; none do today. + argv = shlex.split(command) + completed = run(argv, check=False, timeout=_COMMAND_TIMEOUT) + return f"Stdout:\n{completed.stdout}\nStderr:\n{completed.stderr}" + except Exception as exc: # noqa: BLE001 - surface any failure back to the LLM + return f"Error: {exc}" + + +@FAULTS.register("generate_load") +class GenerateLoadFault(Fault): + """A traffic-spike fault driven by an LLM issuing fortio commands. + + The fault exposes a single command-execution capability to the model and + relies on the agent loop to plan and issue exactly one ``fortio load`` + spike against the port-forwarded target. + + Attributes: + id: Identifier (``"generate_load"``). + name: Human-readable name. + target_subsystem: Targeted subsystem (``"traffic"``). + """ + + id = "generate_load" + name = "Generate Load" + target_subsystem = "traffic" + + def __init__(self) -> None: + self._spec: dict[str, Any] = {} + + def get_agnostic_spec(self) -> dict[str, Any]: + """Return the platform-agnostic spec last injected. + + Returns: + The most recent spec dict, or an empty dict before injection. + """ + return dict(self._spec) + + def goal(self, spec: dict[str, Any]) -> str: + """Build the planned-mode goal prompt for the LLM. + + Args: + spec: The chaos task spec describing the load to generate. + + Returns: + The goal prompt instructing the model to issue one fortio spike. + """ + return ( + "Your goal is to execute the following GKE planned chaos engineering " + "disruption action:\n" + f"```json\n{json.dumps(spec, indent=2)}\n```\n\n" + "Guidelines for execution:\n" + "1. Use the 'fortio' tool to inject traffic into GKE.\n" + "2. Note: GKE service target URLs (like *.svc.cluster.local) are " + "port-forwarded to 'http://localhost:8080' on the host, so run fortio " + "against http://localhost:8080 instead.\n" + "Use your run_command tool to execute this disruption safely and effectively." + ) + + def inject( + self, spec: dict[str, Any], context: dict[str, Any] | None = None + ) -> dict[str, Any]: + """Inject the load fault by running a single LLM-planned chaos loop. + + Args: + spec: The chaos task spec; its ``type`` must be ``"generate_load"``. + context: Optional context; ``chaos_active_event`` (a + :class:`threading.Event`) is signaled when load starts. + + Returns: + A report dict with ``status`` and the model's final ``output``. + + Raises: + ValueError: If ``spec['type']`` is not ``"generate_load"``. + """ + action_type = spec.get("type") + if action_type != self.id: + raise ValueError(f"unsupported chaos action type {action_type!r} for {self.id!r}") + + self._spec = dict(spec) + # Deferred import keeps base/fault imports free of the agent + models layer. + from devops_bench.chaos.agent import ChaosAgent + + event = (context or {}).get("chaos_active_event") + agent = ChaosAgent(chaos_active_event=event) + output = agent.run(self.goal(spec)) + return {"status": "completed", "output": output} diff --git a/tests/unit/chaos/test_chaos_agent.py b/tests/unit/chaos/test_chaos_agent.py new file mode 100644 index 00000000..f19a30ae --- /dev/null +++ b/tests/unit/chaos/test_chaos_agent.py @@ -0,0 +1,108 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the model-agnostic ChaosAgent loop.""" + +from __future__ import annotations + +import threading + +from devops_bench.chaos import agent as agent_module +from devops_bench.chaos.agent import RUN_COMMAND_TOOL, ChaosAgent + + +def _make_client(mocker, *, call_batches, texts): + """Build a fake LLMClient that yields the given tool-call batches/texts.""" + client = mocker.MagicMock() + client.format_tools.return_value = "FORMATTED_TOOLS" + client.generate_content = mocker.AsyncMock( + side_effect=[f"resp{i}" for i in range(len(call_batches))] + ) + client.get_text_content.side_effect = texts + client.extract_function_calls.side_effect = call_batches + return client + + +def test_agent_selects_model_from_config(mocker): + fake_client = mocker.MagicMock() + get_model = mocker.patch.object(agent_module, "get_model", return_value=fake_client) + mocker.patch.object(agent_module, "first_env", side_effect=["my-provider", "my-model"]) + + chaos_agent = ChaosAgent() + + get_model.assert_called_once_with(provider="my-provider", model_name="my-model") + assert chaos_agent._client is fake_client + + +def test_run_executes_tool_then_finishes(mocker): + mock_cmd = mocker.patch.object( + agent_module, "run_chaos_command", return_value="Stdout:\nok\nStderr:\n" + ) + client = _make_client( + mocker, + call_batches=[ + [{"name": "run_command", "args": {"command": "fortio load x"}, "id": "c1"}], + [], + ], + texts=["", "all done"], + ) + + event = threading.Event() + chaos_agent = ChaosAgent(chaos_active_event=event, client=client) + result = chaos_agent.run("do chaos") + + assert result == "all done" + mock_cmd.assert_called_once_with("fortio load x", event) + # Two model turns; tool result fed back between them. + assert client.generate_content.await_count == 2 + client.format_tools.assert_called_once_with([RUN_COMMAND_TOOL]) + + +def test_run_finishes_immediately_without_tool_calls(mocker): + mock_cmd = mocker.patch.object(agent_module, "run_chaos_command") + client = _make_client(mocker, call_batches=[[]], texts=["nothing to do"]) + + chaos_agent = ChaosAgent(client=client) + result = chaos_agent.run("noop") + + assert result == "nothing to do" + mock_cmd.assert_not_called() + + +def test_run_stops_at_turn_limit(mocker): + mocker.patch.object(agent_module, "run_chaos_command", return_value="out") + mocker.patch.object(agent_module, "_MAX_TURNS", 3) + # Always returns a tool call so the loop must hit the cap. + client = mocker.MagicMock() + client.format_tools.return_value = "T" + client.generate_content = mocker.AsyncMock(return_value="resp") + client.get_text_content.return_value = "" + client.extract_function_calls.return_value = [ + {"name": "run_command", "args": {"command": "fortio load x"}, "id": "c"} + ] + + chaos_agent = ChaosAgent(client=client) + result = chaos_agent.run("loop forever") + + assert result == "" + assert client.generate_content.await_count == 3 + + +def test_unknown_tool_returns_error(mocker): + client = mocker.MagicMock() + chaos_agent = ChaosAgent(client=client) + + result = chaos_agent._execute_tool("mystery", {}) + + assert result.startswith("Error: unknown tool") diff --git a/tests/unit/chaos/test_chaos_generate_load.py b/tests/unit/chaos/test_chaos_generate_load.py new file mode 100644 index 00000000..574a15b6 --- /dev/null +++ b/tests/unit/chaos/test_chaos_generate_load.py @@ -0,0 +1,141 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the generate_load fault and its command-execution path.""" + +from __future__ import annotations + +import threading +from types import SimpleNamespace + +import pytest + +from devops_bench.chaos.base import FAULTS, Fault +from devops_bench.chaos.faults import generate_load +from devops_bench.chaos.faults.generate_load import GenerateLoadFault, run_chaos_command + + +def test_fault_is_registered(): + assert FAULTS.get("generate_load") is GenerateLoadFault + assert issubclass(GenerateLoadFault, Fault) + + +def test_run_chaos_command_splits_argv_and_returns_output(mocker): + mock_run = mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="mock stdout", stderr="mock stderr", returncode=0), + ) + + cmd = "~/go/bin/fortio load -qps 100 -t 10s -c 2 http://localhost:8080" + result = run_chaos_command(cmd) + + mock_run.assert_called_once_with( + ["~/go/bin/fortio", "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + check=False, + timeout=40, + ) + assert "mock stdout" in result + assert "mock stderr" in result + + +def test_run_chaos_command_sets_event_on_load_spike(mocker): + mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="", stderr="", returncode=0), + ) + event = threading.Event() + + run_chaos_command("~/go/bin/fortio load -qps 100 http://localhost:8080", event) + + assert event.is_set() + + +def test_run_chaos_command_does_not_set_event_for_non_load(mocker): + mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="", stderr="", returncode=0), + ) + event = threading.Event() + + run_chaos_command("kubectl get pods", event) + + assert not event.is_set() + + +def test_run_chaos_command_returns_error_string_on_exception(mocker): + mocker.patch.object(generate_load, "run", side_effect=RuntimeError("boom")) + + result = run_chaos_command("kubectl get pods") + + assert result.startswith("Error:") + assert "boom" in result + + +def test_inject_rejects_wrong_type(): + fault = GenerateLoadFault() + with pytest.raises(ValueError): + fault.inject({"type": "kill_pod"}) + + +def test_inject_runs_agent_and_signals_event(mocker): + """Port of the legacy generate_load test: the LLM issues one fortio spike. + + The LLMClient is mocked so no real SDK is touched, and the command path's + ``run`` is mocked so no real fortio/kubectl executes. + """ + mock_run = mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="mock stdout", stderr="mock stderr", returncode=0), + ) + + expected_cmd = "~/go/bin/fortio load -qps 100 -t 10s -c 2 http://localhost:8080" + + # Fake LLM client: first turn asks to run the fortio command, second turn + # returns the final summary with no further tool calls. + fake_client = mocker.MagicMock() + fake_client.format_tools.return_value = "TOOLS" + fake_client.generate_content = mocker.AsyncMock(side_effect=["resp1", "resp2"]) + fake_client.get_text_content.side_effect = ["", "Disruption complete"] + fake_client.extract_function_calls.side_effect = [ + [{"name": "run_command", "args": {"command": expected_cmd}, "id": "c1"}], + [], + ] + mocker.patch("devops_bench.chaos.agent.get_model", return_value=fake_client) + + event = threading.Event() + fault = GenerateLoadFault() + spec = { + "type": "generate_load", + "target": { + "service_url": "http://localhost:8082", + "qps": 100, + "duration": "10s", + "concurrency": 2, + }, + } + + report = fault.inject(spec, context={"chaos_active_event": event}) + + mock_run.assert_called_once_with( + ["~/go/bin/fortio", "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + check=False, + timeout=40, + ) + assert event.is_set() + assert report == {"status": "completed", "output": "Disruption complete"} + assert fault.get_agnostic_spec() == spec From 4aad7e6c0c5375f86144b392752dcf7cd4ef776b Mon Sep 17 00:00:00 2001 From: pradeepvrd Date: Thu, 18 Jun 2026 01:15:40 -0700 Subject: [PATCH 2/3] fix(chaos): correct loop text retention, tool-arg/empty-command guards, and event ordering Modules moved/refactored: - see base move commit (devops_bench/chaos/agent.py, devops_bench/chaos/faults/generate_load.py) Bugs fixed vs legacy: - ChaosAgent._run_async dropped the model's final text when a tool call landed on the last turn (or the turn cap): final_text was only assigned when there were no function calls. Now set final_text on every turn so an accompanying summary is never lost. - _execute_tool raised AttributeError when the model returned non-dict tool args (str/list/None): args.get(...) was called unconditionally. Now guard with isinstance(args, dict) and return "Error: tool args must be an object"; the caller passes raw args so the guard fires. - run_chaos_command raised IndexError on an empty command string (shlex.split -> [] -> run([])). Now short-circuit with "Error: command string is empty" before parsing. - run_chaos_command set chaos_active_event BEFORE parsing, so a command that failed shlex.split still told the harness "load active". Now signal the event only after a successful parse, immediately before execution. Improvements vs legacy: - none (behavioral bug fixes only; further improvements land in the following feat(chaos) commit) --- devops_bench/chaos/agent.py | 14 +++++++++---- devops_bench/chaos/faults/generate_load.py | 15 ++++++++++---- tests/unit/chaos/test_chaos_agent.py | 20 +++++++++++++++---- tests/unit/chaos/test_chaos_generate_load.py | 21 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 12 deletions(-) diff --git a/devops_bench/chaos/agent.py b/devops_bench/chaos/agent.py index 7c1157fa..ee8c6c04 100644 --- a/devops_bench/chaos/agent.py +++ b/devops_bench/chaos/agent.py @@ -132,13 +132,17 @@ async def _run_async(self, goal: str) -> str: assistant_message["tool_calls"] = function_calls contents.append(assistant_message) + # Retain the latest text on every turn so a tool call on the final + # turn (or the turn cap) does not discard the model's accompanying + # summary. + final_text = text + if not function_calls: - final_text = text _log.info("chaos agent finished: no further tool calls") break for call in function_calls: - result = self._execute_tool(call.get("name"), call.get("args") or {}) + result = self._execute_tool(call.get("name"), call.get("args")) contents.append( { "role": "tool", @@ -152,16 +156,18 @@ async def _run_async(self, goal: str) -> str: return final_text - def _execute_tool(self, name: str | None, args: dict[str, Any]) -> str: + def _execute_tool(self, name: str | None, args: Any) -> str: """Dispatch a model tool call to its implementation. Args: name: Requested tool name. - args: Tool arguments from the model. + args: Tool arguments from the model; expected to be an object (dict). Returns: The tool's textual result, or an error description. """ + if not isinstance(args, dict): + return "Error: tool args must be an object" if name == RUN_COMMAND_TOOL.name: command = args.get("command", "") return run_chaos_command(command, self._chaos_active_event) diff --git a/devops_bench/chaos/faults/generate_load.py b/devops_bench/chaos/faults/generate_load.py index 98d7fcd8..9d88aaac 100644 --- a/devops_bench/chaos/faults/generate_load.py +++ b/devops_bench/chaos/faults/generate_load.py @@ -56,17 +56,24 @@ def run_chaos_command( A string combining stdout and stderr, or an ``Error:`` description if the command could not be run. """ + if not command.strip(): + return "Error: command string is empty" + try: _log.info("running chaos command: %s", command) - if chaos_active_event is not None and _LOAD_MARKER in command: - _log.info("load spike detected; signaling harness via chaos event") - chaos_active_event.set() - # The model emits shell strings (fortio pipelines, kubectl exec); split # into argv so execution stays shell-free. Commands that genuinely need # shell features would have to be handled explicitly; none do today. argv = shlex.split(command) + + # Signal "load active" only once the command parses cleanly and we are + # about to execute it, so a parse failure never falsely tells the harness + # that load is live. + if chaos_active_event is not None and _LOAD_MARKER in command: + _log.info("load spike detected; signaling harness via chaos event") + chaos_active_event.set() + completed = run(argv, check=False, timeout=_COMMAND_TIMEOUT) return f"Stdout:\n{completed.stdout}\nStderr:\n{completed.stderr}" except Exception as exc: # noqa: BLE001 - surface any failure back to the LLM diff --git a/tests/unit/chaos/test_chaos_agent.py b/tests/unit/chaos/test_chaos_agent.py index f19a30ae..0d50659c 100644 --- a/tests/unit/chaos/test_chaos_agent.py +++ b/tests/unit/chaos/test_chaos_agent.py @@ -80,14 +80,16 @@ def test_run_finishes_immediately_without_tool_calls(mocker): mock_cmd.assert_not_called() -def test_run_stops_at_turn_limit(mocker): +def test_run_stops_at_turn_limit_retains_last_text(mocker): mocker.patch.object(agent_module, "run_chaos_command", return_value="out") mocker.patch.object(agent_module, "_MAX_TURNS", 3) - # Always returns a tool call so the loop must hit the cap. + # Always returns a tool call so the loop must hit the cap; the model also + # emits text on every turn, which must be retained even though the final + # turn still carries a tool call. client = mocker.MagicMock() client.format_tools.return_value = "T" client.generate_content = mocker.AsyncMock(return_value="resp") - client.get_text_content.return_value = "" + client.get_text_content.return_value = "still working" client.extract_function_calls.return_value = [ {"name": "run_command", "args": {"command": "fortio load x"}, "id": "c"} ] @@ -95,7 +97,8 @@ def test_run_stops_at_turn_limit(mocker): chaos_agent = ChaosAgent(client=client) result = chaos_agent.run("loop forever") - assert result == "" + # Final-turn text is preserved despite the accompanying tool call. + assert result == "still working" assert client.generate_content.await_count == 3 @@ -106,3 +109,12 @@ def test_unknown_tool_returns_error(mocker): result = chaos_agent._execute_tool("mystery", {}) assert result.startswith("Error: unknown tool") + + +def test_non_dict_args_returns_error(mocker): + client = mocker.MagicMock() + chaos_agent = ChaosAgent(client=client) + + for bad_args in (None, "fortio load x", ["fortio"], 42): + result = chaos_agent._execute_tool("run_command", bad_args) + assert result == "Error: tool args must be an object" diff --git a/tests/unit/chaos/test_chaos_generate_load.py b/tests/unit/chaos/test_chaos_generate_load.py index 574a15b6..8ac83390 100644 --- a/tests/unit/chaos/test_chaos_generate_load.py +++ b/tests/unit/chaos/test_chaos_generate_load.py @@ -85,6 +85,27 @@ def test_run_chaos_command_returns_error_string_on_exception(mocker): assert "boom" in result +def test_run_chaos_command_empty_command_guard(mocker): + mock_run = mocker.patch.object(generate_load, "run") + + for empty in ("", " ", "\n\t"): + assert run_chaos_command(empty) == "Error: command string is empty" + mock_run.assert_not_called() + + +def test_run_chaos_command_does_not_set_event_on_parse_failure(mocker): + mock_run = mocker.patch.object(generate_load, "run") + event = threading.Event() + + # Unbalanced quote: contains the load marker but fails shlex.split, so the + # command never executes and the event must not be signaled. + result = run_chaos_command('fortio load "unterminated', event) + + assert result.startswith("Error:") + assert not event.is_set() + mock_run.assert_not_called() + + def test_inject_rejects_wrong_type(): fault = GenerateLoadFault() with pytest.raises(ValueError): From 4a10e71cdf9b15ab59798541951affe9053262c2 Mon Sep 17 00:00:00 2001 From: pradeepvrd Date: Thu, 18 Jun 2026 01:16:09 -0700 Subject: [PATCH 3/3] feat(chaos): spec-driven target URL, ~ expansion, and ChaosAgent dependency injection Modules moved/refactored: - see base move commit (devops_bench/chaos/agent.py, devops_bench/chaos/faults/generate_load.py) Bugs fixed vs legacy: - none (fixes landed in the preceding fix(chaos) commit) Improvements vs legacy: - expand a leading ~ in each command token (os.path.expanduser) so model-emitted paths like ~/go/bin/fortio resolve under the shell-free argv executor instead of failing execvp; document that only single, non-piped commands are supported (no pipes/redirection/$VAR) in the run_command prompt and docstring. - drive the fortio target URL from the spec: read target.service_url (rewritten by the harness to the local port-forward) via target_url_from_spec() with a single _DEFAULT_TARGET_URL fallback, and inject it into both the goal and the system instruction (build_system_instruction(target_url)), removing the hardcoded http://localhost:8080 from SYSTEM_INSTRUCTION and goal(). - ChaosAgent.__init__ now accepts optional system_instruction and tools (defaulting to the module constants), used throughout the loop, so the agent is reusable for other faults. - decouple the orchestrator from the concrete fault: drop the top-level import of run_chaos_command and inject a tool_handler callable into the ctor (lazily defaulting to run_chaos_command); _execute_tool dispatches via self._tool_handler. --- devops_bench/chaos/agent.py | 137 +++++++++++++++---- devops_bench/chaos/faults/generate_load.py | 48 +++++-- tests/unit/chaos/test_chaos_agent.py | 79 +++++++++-- tests/unit/chaos/test_chaos_generate_load.py | 24 +++- 4 files changed, 235 insertions(+), 53 deletions(-) diff --git a/devops_bench/chaos/agent.py b/devops_bench/chaos/agent.py index ee8c6c04..dbfe6558 100644 --- a/devops_bench/chaos/agent.py +++ b/devops_bench/chaos/agent.py @@ -18,41 +18,96 @@ import asyncio import threading +from collections.abc import Callable from types import SimpleNamespace from typing import Any -from devops_bench.chaos.faults.generate_load import run_chaos_command from devops_bench.core import first_env, get_logger from devops_bench.models import LLMClient, get_model -__all__ = ["ChaosAgent", "SYSTEM_INSTRUCTION", "RUN_COMMAND_TOOL"] +__all__ = [ + "ChaosAgent", + "SYSTEM_INSTRUCTION", + "RUN_COMMAND_TOOL", + "build_system_instruction", + "target_url_from_spec", +] _log = get_logger("chaos.agent") -SYSTEM_INSTRUCTION = ( - "You are a professional Site Reliability Engineer (SRE) and Chaos Engineering Expert.\n" - "Your role is to disrupt GKE workloads to test system resilience, which can happen in " - "two modes:\n" - "1. Planned Mode: Execute a specific GKE chaos disruption according to a provided JSON spec.\n" - "2. Autonomous Mode: Autonomously explore the GKE cluster state, identify critical targets " - "(pods, nodes, services), and inject transient faults to test recovery.\n\n" - "You are equipped with the `run_command` tool, which runs shell commands locally on the GKE " - "host control machine (which is fully authenticated and has GKE admin kubectl privileges).\n\n" - "Strict Guidelines for Execution:\n" - "- Single Execution Policy: You MUST execute exactly one tool call to run the planned " - "'fortio' load generation spike. Do NOT attempt to rerun, adjust, or tune the load " - "generation if the target service saturates or returns timeouts. Once the single load " - "command is executed, analyze the output, write your final performance summary, and exit " - "immediately.\n" - "- Safety First: Only inject transient, safe, and recoverable faults (e.g. killing pods, " - "scaling deployments, generating traffic spikes). Do NOT permanently destroy GKE clusters, " - "namespaces, or nodes.\n" - "- Traffic Generation: For load spikes, use the 'fortio' binary. Since GKE internal service " - "URLs (*.svc.cluster.local) are port-forwarded to the host, you MUST target " - "'http://localhost:8080' instead.\n" - "- Analysis & Clarity: Analyze command outputs carefully, report stdout/stderr accurately, " - "and confirm in your final response when the disruption has been successfully completed." -) +# Single source of truth for the load target when a spec omits one. The local +# port-forward URL the cluster workload is exposed on by the harness. +_DEFAULT_TARGET_URL = "http://localhost:8080" + + +def build_system_instruction(target_url: str = _DEFAULT_TARGET_URL) -> str: + """Build the SRE system instruction, targeting ``target_url`` for load. + + Args: + target_url: URL fortio load should be directed at. Flows from the chaos + spec's ``target.service_url`` (rewritten by the harness to the local + port-forward), defaulting to :data:`_DEFAULT_TARGET_URL`. + + Returns: + The system instruction string with the target URL injected. + """ + return ( + "You are a professional Site Reliability Engineer (SRE) and Chaos Engineering Expert.\n" + "Your role is to disrupt GKE workloads to test system resilience, which can happen in " + "two modes:\n" + "1. Planned Mode: Execute a specific GKE chaos disruption according to a provided JSON " + "spec.\n" + "2. Autonomous Mode: Autonomously explore the GKE cluster state, identify critical " + "targets (pods, nodes, services), and inject transient faults to test recovery.\n\n" + "You are equipped with the `run_command` tool, which runs a single command locally on " + "the GKE host control machine (which is fully authenticated and has GKE admin kubectl " + "privileges). Each call must be ONE non-piped command: shell pipelines, redirection, " + "command chaining (``|``, ``>``, ``&&``, ``;``) and environment-variable interpolation " + "(``$VAR``) are NOT supported.\n\n" + "Strict Guidelines for Execution:\n" + "- Single Execution Policy: You MUST execute exactly one tool call to run the planned " + "'fortio' load generation spike. Do NOT attempt to rerun, adjust, or tune the load " + "generation if the target service saturates or returns timeouts. Once the single load " + "command is executed, analyze the output, write your final performance summary, and exit " + "immediately.\n" + "- Safety First: Only inject transient, safe, and recoverable faults (e.g. killing pods, " + "scaling deployments, generating traffic spikes). Do NOT permanently destroy GKE " + "clusters, namespaces, or nodes.\n" + "- Traffic Generation: For load spikes, use the 'fortio' binary. Since GKE internal " + "service URLs (*.svc.cluster.local) are port-forwarded to the host, you MUST target " + f"'{target_url}' instead.\n" + "- Analysis & Clarity: Analyze command outputs carefully, report stdout/stderr " + "accurately, and confirm in your final response when the disruption has been " + "successfully completed." + ) + + +def target_url_from_spec(spec: dict[str, Any] | None) -> str: + """Extract the load target URL from a chaos spec/action. + + Reads ``spec['target']['service_url']`` (the action shape the harness hands + in after rewriting it to the local port-forward), falling back to + :data:`_DEFAULT_TARGET_URL` when absent or malformed. + + Args: + spec: A chaos spec or action dict, or None. + + Returns: + The target URL, or the module default when none is present. + """ + if not isinstance(spec, dict): + return _DEFAULT_TARGET_URL + target = spec.get("target") + if isinstance(target, dict): + url = target.get("service_url") + if isinstance(url, str) and url.strip(): + return url + return _DEFAULT_TARGET_URL + + +# Default system instruction using the fallback target URL. Callers that know +# the spec's target should build a tailored one via build_system_instruction. +SYSTEM_INSTRUCTION = build_system_instruction() # Neutral, duck-typed tool descriptor consumed by ``LLMClient.format_tools`` # (mirrors the MCP tool shape: name/description/inputSchema). @@ -90,12 +145,24 @@ class ChaosAgent: so the harness can coordinate measurements. client: Optional pre-built LLM client; when omitted one is selected from configuration via :func:`get_model`. + system_instruction: System prompt for the loop; defaults to + :data:`SYSTEM_INSTRUCTION`. + tools: Tool descriptors offered to the model; defaults to + ``[RUN_COMMAND_TOOL]``. + tool_handler: Callable invoked for a ``run_command`` tool call as + ``tool_handler(command, chaos_active_event) -> str``. Defaults to + :func:`devops_bench.chaos.faults.generate_load.run_chaos_command`, + imported lazily so the orchestrator does not couple to the concrete + fault at module load. """ def __init__( self, chaos_active_event: threading.Event | None = None, client: LLMClient | None = None, + system_instruction: str | None = None, + tools: list[Any] | None = None, + tool_handler: Callable[[str, threading.Event | None], str] | None = None, ) -> None: self._chaos_active_event = chaos_active_event if client is None: @@ -103,6 +170,16 @@ def __init__( model_name = first_env("CHAOS_MODEL", "AGENT_MODEL") client = get_model(provider=provider, model_name=model_name) self._client = client + self._system_instruction = ( + system_instruction if system_instruction is not None else SYSTEM_INSTRUCTION + ) + self._tools = tools if tools is not None else [RUN_COMMAND_TOOL] + if tool_handler is None: + # Lazy import avoids a top-level agent -> concrete-fault dependency. + from devops_bench.chaos.faults.generate_load import run_chaos_command + + tool_handler = run_chaos_command + self._tool_handler = tool_handler def run(self, goal: str) -> str: """Run the chaos loop synchronously and return the model's final text. @@ -117,13 +194,15 @@ def run(self, goal: str) -> str: async def _run_async(self, goal: str) -> str: client = self._client - tools = client.format_tools([RUN_COMMAND_TOOL]) + tools = client.format_tools(self._tools) contents: list[dict[str, Any]] = [{"role": "user", "content": goal}] final_text = "" for turn in range(_MAX_TURNS): _log.info("chaos agent turn %d", turn + 1) - response = await client.generate_content(contents, tools, SYSTEM_INSTRUCTION) + response = await client.generate_content( + contents, tools, self._system_instruction + ) text = client.get_text_content(response) function_calls = client.extract_function_calls(response) @@ -170,5 +249,5 @@ def _execute_tool(self, name: str | None, args: Any) -> str: return "Error: tool args must be an object" if name == RUN_COMMAND_TOOL.name: command = args.get("command", "") - return run_chaos_command(command, self._chaos_active_event) + return self._tool_handler(command, self._chaos_active_event) return f"Error: unknown tool {name!r}" diff --git a/devops_bench/chaos/faults/generate_load.py b/devops_bench/chaos/faults/generate_load.py index 9d88aaac..ef11ee8f 100644 --- a/devops_bench/chaos/faults/generate_load.py +++ b/devops_bench/chaos/faults/generate_load.py @@ -17,6 +17,7 @@ from __future__ import annotations import json +import os import shlex import threading from typing import Any @@ -41,15 +42,17 @@ def run_chaos_command( command: str, chaos_active_event: threading.Event | None = None, ) -> str: - """Execute a single chaos shell command and return its combined output. + """Execute a single chaos command and return its combined output. - The command is a free-form shell string produced by the LLM (e.g. a - ``fortio load`` invocation). When the command is a load spike and an event - is supplied, the event is set so the harness can observe that load is - active before measuring impact. + The command is a single, non-piped command string produced by the LLM (e.g. + a ``fortio load`` invocation). It is tokenized with :func:`shlex.split` and + run shell-free, so shell features (pipes, redirection, ``$VAR``) are not + supported; a leading ``~`` in each token is expanded to the user's home. + When the command is a load spike and an event is supplied, the event is set + so the harness can observe that load is active before measuring impact. Args: - command: Shell command to execute. + command: Single command to execute (no shell pipelines or redirection). chaos_active_event: Optional event signaled when a load spike starts. Returns: @@ -62,10 +65,12 @@ def run_chaos_command( try: _log.info("running chaos command: %s", command) - # The model emits shell strings (fortio pipelines, kubectl exec); split - # into argv so execution stays shell-free. Commands that genuinely need - # shell features would have to be handled explicitly; none do today. - argv = shlex.split(command) + # The model emits shell strings (e.g. a single fortio invocation); split + # into argv so execution stays shell-free. shlex.split does not expand + # ``~``, so expand each token's leading home to keep ``~/go/bin/fortio`` + # style paths resolvable. Shell features (pipes, redirection, $VARS) are + # not supported by this argv executor. + argv = [os.path.expanduser(arg) for arg in shlex.split(command)] # Signal "load active" only once the command parses cleanly and we are # about to execute it, so a parse failure never falsely tells the harness @@ -112,12 +117,21 @@ def get_agnostic_spec(self) -> dict[str, Any]: def goal(self, spec: dict[str, Any]) -> str: """Build the planned-mode goal prompt for the LLM. + The fortio target URL is read from the spec (``target.service_url``, + rewritten by the harness to the local port-forward) so the spec field + drives the prompt rather than a hardcoded constant. + Args: spec: The chaos task spec describing the load to generate. Returns: The goal prompt instructing the model to issue one fortio spike. """ + # Imported here (not at module top) to keep the fault module free of the + # agent + models layer until injection actually runs. + from devops_bench.chaos.agent import target_url_from_spec + + target_url = target_url_from_spec(spec) return ( "Your goal is to execute the following GKE planned chaos engineering " "disruption action:\n" @@ -125,8 +139,8 @@ def goal(self, spec: dict[str, Any]) -> str: "Guidelines for execution:\n" "1. Use the 'fortio' tool to inject traffic into GKE.\n" "2. Note: GKE service target URLs (like *.svc.cluster.local) are " - "port-forwarded to 'http://localhost:8080' on the host, so run fortio " - "against http://localhost:8080 instead.\n" + f"port-forwarded to '{target_url}' on the host, so run fortio " + f"against {target_url} instead.\n" "Use your run_command tool to execute this disruption safely and effectively." ) @@ -152,9 +166,15 @@ def inject( self._spec = dict(spec) # Deferred import keeps base/fault imports free of the agent + models layer. - from devops_bench.chaos.agent import ChaosAgent + from devops_bench.chaos.agent import ( + ChaosAgent, + build_system_instruction, + target_url_from_spec, + ) event = (context or {}).get("chaos_active_event") - agent = ChaosAgent(chaos_active_event=event) + # Inject the spec's target URL into both the goal and the system prompt. + system_instruction = build_system_instruction(target_url_from_spec(spec)) + agent = ChaosAgent(chaos_active_event=event, system_instruction=system_instruction) output = agent.run(self.goal(spec)) return {"status": "completed", "output": output} diff --git a/tests/unit/chaos/test_chaos_agent.py b/tests/unit/chaos/test_chaos_agent.py index 0d50659c..2e387855 100644 --- a/tests/unit/chaos/test_chaos_agent.py +++ b/tests/unit/chaos/test_chaos_agent.py @@ -46,9 +46,7 @@ def test_agent_selects_model_from_config(mocker): def test_run_executes_tool_then_finishes(mocker): - mock_cmd = mocker.patch.object( - agent_module, "run_chaos_command", return_value="Stdout:\nok\nStderr:\n" - ) + mock_cmd = mocker.MagicMock(return_value="Stdout:\nok\nStderr:\n") client = _make_client( mocker, call_batches=[ @@ -59,7 +57,7 @@ def test_run_executes_tool_then_finishes(mocker): ) event = threading.Event() - chaos_agent = ChaosAgent(chaos_active_event=event, client=client) + chaos_agent = ChaosAgent(chaos_active_event=event, client=client, tool_handler=mock_cmd) result = chaos_agent.run("do chaos") assert result == "all done" @@ -70,10 +68,10 @@ def test_run_executes_tool_then_finishes(mocker): def test_run_finishes_immediately_without_tool_calls(mocker): - mock_cmd = mocker.patch.object(agent_module, "run_chaos_command") + mock_cmd = mocker.MagicMock() client = _make_client(mocker, call_batches=[[]], texts=["nothing to do"]) - chaos_agent = ChaosAgent(client=client) + chaos_agent = ChaosAgent(client=client, tool_handler=mock_cmd) result = chaos_agent.run("noop") assert result == "nothing to do" @@ -81,7 +79,6 @@ def test_run_finishes_immediately_without_tool_calls(mocker): def test_run_stops_at_turn_limit_retains_last_text(mocker): - mocker.patch.object(agent_module, "run_chaos_command", return_value="out") mocker.patch.object(agent_module, "_MAX_TURNS", 3) # Always returns a tool call so the loop must hit the cap; the model also # emits text on every turn, which must be retained even though the final @@ -94,7 +91,7 @@ def test_run_stops_at_turn_limit_retains_last_text(mocker): {"name": "run_command", "args": {"command": "fortio load x"}, "id": "c"} ] - chaos_agent = ChaosAgent(client=client) + chaos_agent = ChaosAgent(client=client, tool_handler=mocker.MagicMock(return_value="out")) result = chaos_agent.run("loop forever") # Final-turn text is preserved despite the accompanying tool call. @@ -118,3 +115,69 @@ def test_non_dict_args_returns_error(mocker): for bad_args in (None, "fortio load x", ["fortio"], 42): result = chaos_agent._execute_tool("run_command", bad_args) assert result == "Error: tool args must be an object" + + +# --- URL flow (#5) ---------------------------------------------------------- + + +def test_target_url_from_spec_reads_service_url(): + spec = {"type": "generate_load", "target": {"service_url": "http://svc:9000"}} + assert agent_module.target_url_from_spec(spec) == "http://svc:9000" + + +def test_target_url_from_spec_falls_back_to_default(): + # Missing target, malformed target, blank URL, and non-dict all fall back. + assert agent_module.target_url_from_spec({}) == "http://localhost:8080" + assert agent_module.target_url_from_spec({"target": "nope"}) == "http://localhost:8080" + assert ( + agent_module.target_url_from_spec({"target": {"service_url": " "}}) + == "http://localhost:8080" + ) + assert agent_module.target_url_from_spec(None) == "http://localhost:8080" + + +def test_build_system_instruction_injects_url(): + instruction = agent_module.build_system_instruction("http://svc:9000") + assert "http://svc:9000" in instruction + # The hardcoded default is not present when a custom URL is supplied. + assert "localhost:8080" not in instruction + + +def test_default_system_instruction_uses_default_url(): + assert "http://localhost:8080" in agent_module.SYSTEM_INSTRUCTION + + +# --- constructor dependency injection (#6) + custom tools/instruction -------- + + +def test_run_uses_injected_system_instruction_and_tools(mocker): + client = _make_client(mocker, call_batches=[[]], texts=["done"]) + custom_tools = [RUN_COMMAND_TOOL, RUN_COMMAND_TOOL] + + chaos_agent = ChaosAgent( + client=client, + system_instruction="CUSTOM SYS", + tools=custom_tools, + tool_handler=mocker.MagicMock(), + ) + chaos_agent.run("go") + + client.format_tools.assert_called_once_with(custom_tools) + # The injected system instruction is passed through to generate_content. + assert client.generate_content.await_args.args[2] == "CUSTOM SYS" + + +# --- handler decoupling (#7) ------------------------------------------------ + + +def test_agent_module_has_no_top_level_fault_import(): + # The orchestrator must not couple to the concrete fault at module load. + assert not hasattr(agent_module, "run_chaos_command") + + +def test_default_tool_handler_is_run_chaos_command(mocker): + # With no injected handler, the ctor lazily binds the real fault handler. + from devops_bench.chaos.faults.generate_load import run_chaos_command + + chaos_agent = ChaosAgent(client=mocker.MagicMock()) + assert chaos_agent._tool_handler is run_chaos_command diff --git a/tests/unit/chaos/test_chaos_generate_load.py b/tests/unit/chaos/test_chaos_generate_load.py index 8ac83390..12cb60d2 100644 --- a/tests/unit/chaos/test_chaos_generate_load.py +++ b/tests/unit/chaos/test_chaos_generate_load.py @@ -16,6 +16,7 @@ from __future__ import annotations +import os import threading from types import SimpleNamespace @@ -41,8 +42,9 @@ def test_run_chaos_command_splits_argv_and_returns_output(mocker): cmd = "~/go/bin/fortio load -qps 100 -t 10s -c 2 http://localhost:8080" result = run_chaos_command(cmd) + fortio = os.path.expanduser("~/go/bin/fortio") mock_run.assert_called_once_with( - ["~/go/bin/fortio", "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + [fortio, "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], check=False, timeout=40, ) @@ -112,6 +114,23 @@ def test_inject_rejects_wrong_type(): fault.inject({"type": "kill_pod"}) +def test_goal_uses_spec_target_url(): + fault = GenerateLoadFault() + spec = {"type": "generate_load", "target": {"service_url": "http://svc:9000"}} + + goal = fault.goal(spec) + + # The spec's service_url drives the prompt instead of a hardcoded constant. + assert "http://svc:9000" in goal + assert "localhost:8080" not in goal + + +def test_goal_falls_back_to_default_url_when_absent(): + fault = GenerateLoadFault() + goal = fault.goal({"type": "generate_load"}) + assert "http://localhost:8080" in goal + + def test_inject_runs_agent_and_signals_event(mocker): """Port of the legacy generate_load test: the LLM issues one fortio spike. @@ -152,8 +171,9 @@ def test_inject_runs_agent_and_signals_event(mocker): report = fault.inject(spec, context={"chaos_active_event": event}) + fortio = os.path.expanduser("~/go/bin/fortio") mock_run.assert_called_once_with( - ["~/go/bin/fortio", "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + [fortio, "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], check=False, timeout=40, )