diff --git a/devops_bench/chaos/__init__.py b/devops_bench/chaos/__init__.py new file mode 100644 index 00000000..b2465941 --- /dev/null +++ b/devops_bench/chaos/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Chaos injection: fault/trigger interfaces, registries, and the agent loop. + +Importing this package is light: provider SDKs are loaded lazily by the models +layer only when :class:`ChaosAgent` constructs a client. Concrete faults are +imported here so they self-register in :data:`FAULTS`. +""" + +from __future__ import annotations + +from devops_bench.chaos.agent import ChaosAgent +from devops_bench.chaos.base import FAULTS, TRIGGERS, Fault, Trigger +from devops_bench.chaos.faults import generate_load # noqa: F401 - registers the fault + +__all__ = ["ChaosAgent", "Fault", "Trigger", "FAULTS", "TRIGGERS"] diff --git a/devops_bench/chaos/agent.py b/devops_bench/chaos/agent.py new file mode 100644 index 00000000..dbfe6558 --- /dev/null +++ b/devops_bench/chaos/agent.py @@ -0,0 +1,253 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""LLM-driven orchestration loop for injecting chaos faults.""" + +from __future__ import annotations + +import asyncio +import threading +from collections.abc import Callable +from types import SimpleNamespace +from typing import Any + +from devops_bench.core import first_env, get_logger +from devops_bench.models import LLMClient, get_model + +__all__ = [ + "ChaosAgent", + "SYSTEM_INSTRUCTION", + "RUN_COMMAND_TOOL", + "build_system_instruction", + "target_url_from_spec", +] + +_log = get_logger("chaos.agent") + +# Single source of truth for the load target when a spec omits one. The local +# port-forward URL the cluster workload is exposed on by the harness. +_DEFAULT_TARGET_URL = "http://localhost:8080" + + +def build_system_instruction(target_url: str = _DEFAULT_TARGET_URL) -> str: + """Build the SRE system instruction, targeting ``target_url`` for load. + + Args: + target_url: URL fortio load should be directed at. Flows from the chaos + spec's ``target.service_url`` (rewritten by the harness to the local + port-forward), defaulting to :data:`_DEFAULT_TARGET_URL`. + + Returns: + The system instruction string with the target URL injected. + """ + return ( + "You are a professional Site Reliability Engineer (SRE) and Chaos Engineering Expert.\n" + "Your role is to disrupt GKE workloads to test system resilience, which can happen in " + "two modes:\n" + "1. Planned Mode: Execute a specific GKE chaos disruption according to a provided JSON " + "spec.\n" + "2. Autonomous Mode: Autonomously explore the GKE cluster state, identify critical " + "targets (pods, nodes, services), and inject transient faults to test recovery.\n\n" + "You are equipped with the `run_command` tool, which runs a single command locally on " + "the GKE host control machine (which is fully authenticated and has GKE admin kubectl " + "privileges). Each call must be ONE non-piped command: shell pipelines, redirection, " + "command chaining (``|``, ``>``, ``&&``, ``;``) and environment-variable interpolation " + "(``$VAR``) are NOT supported.\n\n" + "Strict Guidelines for Execution:\n" + "- Single Execution Policy: You MUST execute exactly one tool call to run the planned " + "'fortio' load generation spike. Do NOT attempt to rerun, adjust, or tune the load " + "generation if the target service saturates or returns timeouts. Once the single load " + "command is executed, analyze the output, write your final performance summary, and exit " + "immediately.\n" + "- Safety First: Only inject transient, safe, and recoverable faults (e.g. killing pods, " + "scaling deployments, generating traffic spikes). Do NOT permanently destroy GKE " + "clusters, namespaces, or nodes.\n" + "- Traffic Generation: For load spikes, use the 'fortio' binary. Since GKE internal " + "service URLs (*.svc.cluster.local) are port-forwarded to the host, you MUST target " + f"'{target_url}' instead.\n" + "- Analysis & Clarity: Analyze command outputs carefully, report stdout/stderr " + "accurately, and confirm in your final response when the disruption has been " + "successfully completed." + ) + + +def target_url_from_spec(spec: dict[str, Any] | None) -> str: + """Extract the load target URL from a chaos spec/action. + + Reads ``spec['target']['service_url']`` (the action shape the harness hands + in after rewriting it to the local port-forward), falling back to + :data:`_DEFAULT_TARGET_URL` when absent or malformed. + + Args: + spec: A chaos spec or action dict, or None. + + Returns: + The target URL, or the module default when none is present. + """ + if not isinstance(spec, dict): + return _DEFAULT_TARGET_URL + target = spec.get("target") + if isinstance(target, dict): + url = target.get("service_url") + if isinstance(url, str) and url.strip(): + return url + return _DEFAULT_TARGET_URL + + +# Default system instruction using the fallback target URL. Callers that know +# the spec's target should build a tailored one via build_system_instruction. +SYSTEM_INSTRUCTION = build_system_instruction() + +# Neutral, duck-typed tool descriptor consumed by ``LLMClient.format_tools`` +# (mirrors the MCP tool shape: name/description/inputSchema). +RUN_COMMAND_TOOL = SimpleNamespace( + name="run_command", + description=( + "Run a shell command on the GKE host control machine (authenticated kubectl + fortio). " + "Returns combined stdout and stderr." + ), + inputSchema={ + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "The shell command to execute, e.g. a fortio load invocation.", + } + }, + "required": ["command"], + }, +) + +# Safety bound on the agent loop so a misbehaving model cannot spin forever. +_MAX_TURNS = 8 + + +class ChaosAgent: + """Drives an LLM through a tool-calling loop to inject chaos faults. + + The agent is provider-agnostic: it obtains an :class:`LLMClient` from the + models layer and never imports a provider SDK. The model is given a single + ``run_command`` tool and loops until it stops requesting tool calls. + + Args: + chaos_active_event: Optional event signaled when a load spike starts, + so the harness can coordinate measurements. + client: Optional pre-built LLM client; when omitted one is selected + from configuration via :func:`get_model`. + system_instruction: System prompt for the loop; defaults to + :data:`SYSTEM_INSTRUCTION`. + tools: Tool descriptors offered to the model; defaults to + ``[RUN_COMMAND_TOOL]``. + tool_handler: Callable invoked for a ``run_command`` tool call as + ``tool_handler(command, chaos_active_event) -> str``. Defaults to + :func:`devops_bench.chaos.faults.generate_load.run_chaos_command`, + imported lazily so the orchestrator does not couple to the concrete + fault at module load. + """ + + def __init__( + self, + chaos_active_event: threading.Event | None = None, + client: LLMClient | None = None, + system_instruction: str | None = None, + tools: list[Any] | None = None, + tool_handler: Callable[[str, threading.Event | None], str] | None = None, + ) -> None: + self._chaos_active_event = chaos_active_event + if client is None: + provider = first_env("CHAOS_PROVIDER", "AGENT_PROVIDER") + model_name = first_env("CHAOS_MODEL", "AGENT_MODEL") + client = get_model(provider=provider, model_name=model_name) + self._client = client + self._system_instruction = ( + system_instruction if system_instruction is not None else SYSTEM_INSTRUCTION + ) + self._tools = tools if tools is not None else [RUN_COMMAND_TOOL] + if tool_handler is None: + # Lazy import avoids a top-level agent -> concrete-fault dependency. + from devops_bench.chaos.faults.generate_load import run_chaos_command + + tool_handler = run_chaos_command + self._tool_handler = tool_handler + + def run(self, goal: str) -> str: + """Run the chaos loop synchronously and return the model's final text. + + Args: + goal: The planned-mode goal prompt for the model. + + Returns: + The model's final text response once it stops calling tools. + """ + return asyncio.run(self._run_async(goal)) + + async def _run_async(self, goal: str) -> str: + client = self._client + tools = client.format_tools(self._tools) + contents: list[dict[str, Any]] = [{"role": "user", "content": goal}] + + final_text = "" + for turn in range(_MAX_TURNS): + _log.info("chaos agent turn %d", turn + 1) + response = await client.generate_content( + contents, tools, self._system_instruction + ) + text = client.get_text_content(response) + function_calls = client.extract_function_calls(response) + + assistant_message: dict[str, Any] = {"role": "assistant", "content": text} + if function_calls: + assistant_message["tool_calls"] = function_calls + contents.append(assistant_message) + + # Retain the latest text on every turn so a tool call on the final + # turn (or the turn cap) does not discard the model's accompanying + # summary. + final_text = text + + if not function_calls: + _log.info("chaos agent finished: no further tool calls") + break + + for call in function_calls: + result = self._execute_tool(call.get("name"), call.get("args")) + contents.append( + { + "role": "tool", + "tool_call_id": call.get("id"), + "name": call.get("name"), + "content": result, + } + ) + else: + _log.warning("chaos agent stopped after reaching the turn limit (%d)", _MAX_TURNS) + + return final_text + + def _execute_tool(self, name: str | None, args: Any) -> str: + """Dispatch a model tool call to its implementation. + + Args: + name: Requested tool name. + args: Tool arguments from the model; expected to be an object (dict). + + Returns: + The tool's textual result, or an error description. + """ + if not isinstance(args, dict): + return "Error: tool args must be an object" + if name == RUN_COMMAND_TOOL.name: + command = args.get("command", "") + return self._tool_handler(command, self._chaos_active_event) + return f"Error: unknown tool {name!r}" diff --git a/devops_bench/chaos/base.py b/devops_bench/chaos/base.py new file mode 100644 index 00000000..b12d50b7 --- /dev/null +++ b/devops_bench/chaos/base.py @@ -0,0 +1,103 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Chaos fault/trigger interfaces and their selection registries.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +from devops_bench.core import Registry + +__all__ = ["Fault", "Trigger", "FAULTS", "TRIGGERS"] + +FAULTS: Registry[type[Fault]] = Registry("faults") +TRIGGERS: Registry[type[Trigger]] = Registry("triggers") + + +class Fault(ABC): + """Abstract base for a platform-agnostic disruption or failure state. + + A fault describes *what* to disrupt independent of the target platform. + Concrete faults live in sibling modules under ``chaos.faults`` and + self-register under a canonical key via ``@FAULTS.register(...)``. + + Attributes: + id: Stable identifier for this fault instance. + name: Human-readable name. + target_subsystem: Subsystem the fault targets (e.g. ``"network"``). + """ + + id: str + name: str + target_subsystem: str + + @abstractmethod + def get_agnostic_spec(self) -> dict[str, Any]: + """Return the standardized, platform-agnostic parameters of the fault. + + Returns: + A JSON-serializable dict describing the disruption. + """ + + @abstractmethod + def inject(self, spec: dict[str, Any], context: dict[str, Any] | None = None) -> dict[str, Any]: + """Inject the fault into the target platform. + + Args: + spec: Platform-agnostic fault spec (the chaos task definition). + context: Optional execution context (signaling events, runtime + params) forwarded by the caller. + + Returns: + A JSON-serializable report describing the outcome. + """ + + +class Trigger(ABC): + """Abstract base for the condition that decides when a fault should fire. + + A trigger evaluates platform-agnostic state (provided by a verifier or + monitoring source) and lives outside any chaos infrastructure. Concrete + triggers self-register under a canonical key via ``@TRIGGERS.register(...)``. + + Attributes: + id: Stable identifier for this trigger instance. + name: Human-readable name. + trigger_type: Discriminator describing the trigger heuristic. + """ + + id: str + name: str + trigger_type: str + + def initialize(self, context: dict[str, Any]) -> None: + """Initialize trigger state (e.g. baselines or internal timers). + + Args: + context: Platform-agnostic context used to seed the trigger. + """ + return None + + @abstractmethod + def is_triggered(self, current_platform_state: dict[str, Any]) -> bool: + """Evaluate state to decide whether the fault should be injected. + + Args: + current_platform_state: Platform-agnostic state snapshot. + + Returns: + True when the fault should fire. + """ diff --git a/devops_bench/chaos/faults/__init__.py b/devops_bench/chaos/faults/__init__.py new file mode 100644 index 00000000..f6f8fd87 --- /dev/null +++ b/devops_bench/chaos/faults/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Concrete chaos faults, each self-registering in ``chaos.base.FAULTS``.""" + +from __future__ import annotations + +__all__: list[str] = [] diff --git a/devops_bench/chaos/faults/generate_load.py b/devops_bench/chaos/faults/generate_load.py new file mode 100644 index 00000000..ef11ee8f --- /dev/null +++ b/devops_bench/chaos/faults/generate_load.py @@ -0,0 +1,180 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The ``generate_load`` fault: LLM-driven fortio/kubectl traffic spikes.""" + +from __future__ import annotations + +import json +import os +import shlex +import threading +from typing import Any + +from devops_bench.chaos.base import FAULTS, Fault +from devops_bench.core import get_logger +from devops_bench.core.subprocess import run + +__all__ = ["GenerateLoadFault", "run_chaos_command"] + +_log = get_logger("chaos.generate_load") + +# Marker substring that, when present in a command, indicates a load spike is +# active. The harness watches the shared event to coordinate measurements. +_LOAD_MARKER = "fortio load" + +# Wall-clock ceiling for a single chaos command, matching legacy behavior. +_COMMAND_TIMEOUT = 40 + + +def run_chaos_command( + command: str, + chaos_active_event: threading.Event | None = None, +) -> str: + """Execute a single chaos command and return its combined output. + + The command is a single, non-piped command string produced by the LLM (e.g. + a ``fortio load`` invocation). It is tokenized with :func:`shlex.split` and + run shell-free, so shell features (pipes, redirection, ``$VAR``) are not + supported; a leading ``~`` in each token is expanded to the user's home. + When the command is a load spike and an event is supplied, the event is set + so the harness can observe that load is active before measuring impact. + + Args: + command: Single command to execute (no shell pipelines or redirection). + chaos_active_event: Optional event signaled when a load spike starts. + + Returns: + A string combining stdout and stderr, or an ``Error:`` description if + the command could not be run. + """ + if not command.strip(): + return "Error: command string is empty" + + try: + _log.info("running chaos command: %s", command) + + # The model emits shell strings (e.g. a single fortio invocation); split + # into argv so execution stays shell-free. shlex.split does not expand + # ``~``, so expand each token's leading home to keep ``~/go/bin/fortio`` + # style paths resolvable. Shell features (pipes, redirection, $VARS) are + # not supported by this argv executor. + argv = [os.path.expanduser(arg) for arg in shlex.split(command)] + + # Signal "load active" only once the command parses cleanly and we are + # about to execute it, so a parse failure never falsely tells the harness + # that load is live. + if chaos_active_event is not None and _LOAD_MARKER in command: + _log.info("load spike detected; signaling harness via chaos event") + chaos_active_event.set() + + completed = run(argv, check=False, timeout=_COMMAND_TIMEOUT) + return f"Stdout:\n{completed.stdout}\nStderr:\n{completed.stderr}" + except Exception as exc: # noqa: BLE001 - surface any failure back to the LLM + return f"Error: {exc}" + + +@FAULTS.register("generate_load") +class GenerateLoadFault(Fault): + """A traffic-spike fault driven by an LLM issuing fortio commands. + + The fault exposes a single command-execution capability to the model and + relies on the agent loop to plan and issue exactly one ``fortio load`` + spike against the port-forwarded target. + + Attributes: + id: Identifier (``"generate_load"``). + name: Human-readable name. + target_subsystem: Targeted subsystem (``"traffic"``). + """ + + id = "generate_load" + name = "Generate Load" + target_subsystem = "traffic" + + def __init__(self) -> None: + self._spec: dict[str, Any] = {} + + def get_agnostic_spec(self) -> dict[str, Any]: + """Return the platform-agnostic spec last injected. + + Returns: + The most recent spec dict, or an empty dict before injection. + """ + return dict(self._spec) + + def goal(self, spec: dict[str, Any]) -> str: + """Build the planned-mode goal prompt for the LLM. + + The fortio target URL is read from the spec (``target.service_url``, + rewritten by the harness to the local port-forward) so the spec field + drives the prompt rather than a hardcoded constant. + + Args: + spec: The chaos task spec describing the load to generate. + + Returns: + The goal prompt instructing the model to issue one fortio spike. + """ + # Imported here (not at module top) to keep the fault module free of the + # agent + models layer until injection actually runs. + from devops_bench.chaos.agent import target_url_from_spec + + target_url = target_url_from_spec(spec) + return ( + "Your goal is to execute the following GKE planned chaos engineering " + "disruption action:\n" + f"```json\n{json.dumps(spec, indent=2)}\n```\n\n" + "Guidelines for execution:\n" + "1. Use the 'fortio' tool to inject traffic into GKE.\n" + "2. Note: GKE service target URLs (like *.svc.cluster.local) are " + f"port-forwarded to '{target_url}' on the host, so run fortio " + f"against {target_url} instead.\n" + "Use your run_command tool to execute this disruption safely and effectively." + ) + + def inject( + self, spec: dict[str, Any], context: dict[str, Any] | None = None + ) -> dict[str, Any]: + """Inject the load fault by running a single LLM-planned chaos loop. + + Args: + spec: The chaos task spec; its ``type`` must be ``"generate_load"``. + context: Optional context; ``chaos_active_event`` (a + :class:`threading.Event`) is signaled when load starts. + + Returns: + A report dict with ``status`` and the model's final ``output``. + + Raises: + ValueError: If ``spec['type']`` is not ``"generate_load"``. + """ + action_type = spec.get("type") + if action_type != self.id: + raise ValueError(f"unsupported chaos action type {action_type!r} for {self.id!r}") + + self._spec = dict(spec) + # Deferred import keeps base/fault imports free of the agent + models layer. + from devops_bench.chaos.agent import ( + ChaosAgent, + build_system_instruction, + target_url_from_spec, + ) + + event = (context or {}).get("chaos_active_event") + # Inject the spec's target URL into both the goal and the system prompt. + system_instruction = build_system_instruction(target_url_from_spec(spec)) + agent = ChaosAgent(chaos_active_event=event, system_instruction=system_instruction) + output = agent.run(self.goal(spec)) + return {"status": "completed", "output": output} diff --git a/tests/unit/chaos/test_chaos_agent.py b/tests/unit/chaos/test_chaos_agent.py new file mode 100644 index 00000000..2e387855 --- /dev/null +++ b/tests/unit/chaos/test_chaos_agent.py @@ -0,0 +1,183 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the model-agnostic ChaosAgent loop.""" + +from __future__ import annotations + +import threading + +from devops_bench.chaos import agent as agent_module +from devops_bench.chaos.agent import RUN_COMMAND_TOOL, ChaosAgent + + +def _make_client(mocker, *, call_batches, texts): + """Build a fake LLMClient that yields the given tool-call batches/texts.""" + client = mocker.MagicMock() + client.format_tools.return_value = "FORMATTED_TOOLS" + client.generate_content = mocker.AsyncMock( + side_effect=[f"resp{i}" for i in range(len(call_batches))] + ) + client.get_text_content.side_effect = texts + client.extract_function_calls.side_effect = call_batches + return client + + +def test_agent_selects_model_from_config(mocker): + fake_client = mocker.MagicMock() + get_model = mocker.patch.object(agent_module, "get_model", return_value=fake_client) + mocker.patch.object(agent_module, "first_env", side_effect=["my-provider", "my-model"]) + + chaos_agent = ChaosAgent() + + get_model.assert_called_once_with(provider="my-provider", model_name="my-model") + assert chaos_agent._client is fake_client + + +def test_run_executes_tool_then_finishes(mocker): + mock_cmd = mocker.MagicMock(return_value="Stdout:\nok\nStderr:\n") + client = _make_client( + mocker, + call_batches=[ + [{"name": "run_command", "args": {"command": "fortio load x"}, "id": "c1"}], + [], + ], + texts=["", "all done"], + ) + + event = threading.Event() + chaos_agent = ChaosAgent(chaos_active_event=event, client=client, tool_handler=mock_cmd) + result = chaos_agent.run("do chaos") + + assert result == "all done" + mock_cmd.assert_called_once_with("fortio load x", event) + # Two model turns; tool result fed back between them. + assert client.generate_content.await_count == 2 + client.format_tools.assert_called_once_with([RUN_COMMAND_TOOL]) + + +def test_run_finishes_immediately_without_tool_calls(mocker): + mock_cmd = mocker.MagicMock() + client = _make_client(mocker, call_batches=[[]], texts=["nothing to do"]) + + chaos_agent = ChaosAgent(client=client, tool_handler=mock_cmd) + result = chaos_agent.run("noop") + + assert result == "nothing to do" + mock_cmd.assert_not_called() + + +def test_run_stops_at_turn_limit_retains_last_text(mocker): + mocker.patch.object(agent_module, "_MAX_TURNS", 3) + # Always returns a tool call so the loop must hit the cap; the model also + # emits text on every turn, which must be retained even though the final + # turn still carries a tool call. + client = mocker.MagicMock() + client.format_tools.return_value = "T" + client.generate_content = mocker.AsyncMock(return_value="resp") + client.get_text_content.return_value = "still working" + client.extract_function_calls.return_value = [ + {"name": "run_command", "args": {"command": "fortio load x"}, "id": "c"} + ] + + chaos_agent = ChaosAgent(client=client, tool_handler=mocker.MagicMock(return_value="out")) + result = chaos_agent.run("loop forever") + + # Final-turn text is preserved despite the accompanying tool call. + assert result == "still working" + assert client.generate_content.await_count == 3 + + +def test_unknown_tool_returns_error(mocker): + client = mocker.MagicMock() + chaos_agent = ChaosAgent(client=client) + + result = chaos_agent._execute_tool("mystery", {}) + + assert result.startswith("Error: unknown tool") + + +def test_non_dict_args_returns_error(mocker): + client = mocker.MagicMock() + chaos_agent = ChaosAgent(client=client) + + for bad_args in (None, "fortio load x", ["fortio"], 42): + result = chaos_agent._execute_tool("run_command", bad_args) + assert result == "Error: tool args must be an object" + + +# --- URL flow (#5) ---------------------------------------------------------- + + +def test_target_url_from_spec_reads_service_url(): + spec = {"type": "generate_load", "target": {"service_url": "http://svc:9000"}} + assert agent_module.target_url_from_spec(spec) == "http://svc:9000" + + +def test_target_url_from_spec_falls_back_to_default(): + # Missing target, malformed target, blank URL, and non-dict all fall back. + assert agent_module.target_url_from_spec({}) == "http://localhost:8080" + assert agent_module.target_url_from_spec({"target": "nope"}) == "http://localhost:8080" + assert ( + agent_module.target_url_from_spec({"target": {"service_url": " "}}) + == "http://localhost:8080" + ) + assert agent_module.target_url_from_spec(None) == "http://localhost:8080" + + +def test_build_system_instruction_injects_url(): + instruction = agent_module.build_system_instruction("http://svc:9000") + assert "http://svc:9000" in instruction + # The hardcoded default is not present when a custom URL is supplied. + assert "localhost:8080" not in instruction + + +def test_default_system_instruction_uses_default_url(): + assert "http://localhost:8080" in agent_module.SYSTEM_INSTRUCTION + + +# --- constructor dependency injection (#6) + custom tools/instruction -------- + + +def test_run_uses_injected_system_instruction_and_tools(mocker): + client = _make_client(mocker, call_batches=[[]], texts=["done"]) + custom_tools = [RUN_COMMAND_TOOL, RUN_COMMAND_TOOL] + + chaos_agent = ChaosAgent( + client=client, + system_instruction="CUSTOM SYS", + tools=custom_tools, + tool_handler=mocker.MagicMock(), + ) + chaos_agent.run("go") + + client.format_tools.assert_called_once_with(custom_tools) + # The injected system instruction is passed through to generate_content. + assert client.generate_content.await_args.args[2] == "CUSTOM SYS" + + +# --- handler decoupling (#7) ------------------------------------------------ + + +def test_agent_module_has_no_top_level_fault_import(): + # The orchestrator must not couple to the concrete fault at module load. + assert not hasattr(agent_module, "run_chaos_command") + + +def test_default_tool_handler_is_run_chaos_command(mocker): + # With no injected handler, the ctor lazily binds the real fault handler. + from devops_bench.chaos.faults.generate_load import run_chaos_command + + chaos_agent = ChaosAgent(client=mocker.MagicMock()) + assert chaos_agent._tool_handler is run_chaos_command diff --git a/tests/unit/chaos/test_chaos_generate_load.py b/tests/unit/chaos/test_chaos_generate_load.py new file mode 100644 index 00000000..12cb60d2 --- /dev/null +++ b/tests/unit/chaos/test_chaos_generate_load.py @@ -0,0 +1,182 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the generate_load fault and its command-execution path.""" + +from __future__ import annotations + +import os +import threading +from types import SimpleNamespace + +import pytest + +from devops_bench.chaos.base import FAULTS, Fault +from devops_bench.chaos.faults import generate_load +from devops_bench.chaos.faults.generate_load import GenerateLoadFault, run_chaos_command + + +def test_fault_is_registered(): + assert FAULTS.get("generate_load") is GenerateLoadFault + assert issubclass(GenerateLoadFault, Fault) + + +def test_run_chaos_command_splits_argv_and_returns_output(mocker): + mock_run = mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="mock stdout", stderr="mock stderr", returncode=0), + ) + + cmd = "~/go/bin/fortio load -qps 100 -t 10s -c 2 http://localhost:8080" + result = run_chaos_command(cmd) + + fortio = os.path.expanduser("~/go/bin/fortio") + mock_run.assert_called_once_with( + [fortio, "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + check=False, + timeout=40, + ) + assert "mock stdout" in result + assert "mock stderr" in result + + +def test_run_chaos_command_sets_event_on_load_spike(mocker): + mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="", stderr="", returncode=0), + ) + event = threading.Event() + + run_chaos_command("~/go/bin/fortio load -qps 100 http://localhost:8080", event) + + assert event.is_set() + + +def test_run_chaos_command_does_not_set_event_for_non_load(mocker): + mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="", stderr="", returncode=0), + ) + event = threading.Event() + + run_chaos_command("kubectl get pods", event) + + assert not event.is_set() + + +def test_run_chaos_command_returns_error_string_on_exception(mocker): + mocker.patch.object(generate_load, "run", side_effect=RuntimeError("boom")) + + result = run_chaos_command("kubectl get pods") + + assert result.startswith("Error:") + assert "boom" in result + + +def test_run_chaos_command_empty_command_guard(mocker): + mock_run = mocker.patch.object(generate_load, "run") + + for empty in ("", " ", "\n\t"): + assert run_chaos_command(empty) == "Error: command string is empty" + mock_run.assert_not_called() + + +def test_run_chaos_command_does_not_set_event_on_parse_failure(mocker): + mock_run = mocker.patch.object(generate_load, "run") + event = threading.Event() + + # Unbalanced quote: contains the load marker but fails shlex.split, so the + # command never executes and the event must not be signaled. + result = run_chaos_command('fortio load "unterminated', event) + + assert result.startswith("Error:") + assert not event.is_set() + mock_run.assert_not_called() + + +def test_inject_rejects_wrong_type(): + fault = GenerateLoadFault() + with pytest.raises(ValueError): + fault.inject({"type": "kill_pod"}) + + +def test_goal_uses_spec_target_url(): + fault = GenerateLoadFault() + spec = {"type": "generate_load", "target": {"service_url": "http://svc:9000"}} + + goal = fault.goal(spec) + + # The spec's service_url drives the prompt instead of a hardcoded constant. + assert "http://svc:9000" in goal + assert "localhost:8080" not in goal + + +def test_goal_falls_back_to_default_url_when_absent(): + fault = GenerateLoadFault() + goal = fault.goal({"type": "generate_load"}) + assert "http://localhost:8080" in goal + + +def test_inject_runs_agent_and_signals_event(mocker): + """Port of the legacy generate_load test: the LLM issues one fortio spike. + + The LLMClient is mocked so no real SDK is touched, and the command path's + ``run`` is mocked so no real fortio/kubectl executes. + """ + mock_run = mocker.patch.object( + generate_load, + "run", + return_value=SimpleNamespace(stdout="mock stdout", stderr="mock stderr", returncode=0), + ) + + expected_cmd = "~/go/bin/fortio load -qps 100 -t 10s -c 2 http://localhost:8080" + + # Fake LLM client: first turn asks to run the fortio command, second turn + # returns the final summary with no further tool calls. + fake_client = mocker.MagicMock() + fake_client.format_tools.return_value = "TOOLS" + fake_client.generate_content = mocker.AsyncMock(side_effect=["resp1", "resp2"]) + fake_client.get_text_content.side_effect = ["", "Disruption complete"] + fake_client.extract_function_calls.side_effect = [ + [{"name": "run_command", "args": {"command": expected_cmd}, "id": "c1"}], + [], + ] + mocker.patch("devops_bench.chaos.agent.get_model", return_value=fake_client) + + event = threading.Event() + fault = GenerateLoadFault() + spec = { + "type": "generate_load", + "target": { + "service_url": "http://localhost:8082", + "qps": 100, + "duration": "10s", + "concurrency": 2, + }, + } + + report = fault.inject(spec, context={"chaos_active_event": event}) + + fortio = os.path.expanduser("~/go/bin/fortio") + mock_run.assert_called_once_with( + [fortio, "load", "-qps", "100", "-t", "10s", "-c", "2", "http://localhost:8080"], + check=False, + timeout=40, + ) + assert event.is_set() + assert report == {"status": "completed", "output": "Disruption complete"} + assert fault.get_agnostic_spec() == spec