From e690afd1976490b9583da117bb0db9c047831492 Mon Sep 17 00:00:00 2001 From: Renn F Date: Mon, 22 Jun 2026 03:25:57 +0200 Subject: [PATCH 1/2] feat(conventions): standard schema models + effective-map merge --- pyproject.toml | 2 + .../foundation/policy/conventions/__init__.py | 33 +++++ .../policy/conventions/effective_map.py | 65 +++++++++ .../foundation/policy/conventions/models.py | 129 ++++++++++++++++++ .../conventions/test_conventions_models.py | 99 ++++++++++++++ .../policy/conventions/test_effective_map.py | 89 ++++++++++++ uv.lock | 13 ++ 7 files changed, 430 insertions(+) create mode 100644 roboco/foundation/policy/conventions/__init__.py create mode 100644 roboco/foundation/policy/conventions/effective_map.py create mode 100644 roboco/foundation/policy/conventions/models.py create mode 100644 tests/unit/foundation/policy/conventions/test_conventions_models.py create mode 100644 tests/unit/foundation/policy/conventions/test_effective_map.py diff --git a/pyproject.toml b/pyproject.toml index 75ea01c9..51e852d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ # Direct imports (promoted from transitive) "cryptography", # utils/crypto.py — Fernet-encrypted project git tokens "packaging", # services/toolchain.py — PEP 440 requires-python resolution + "pyyaml", # foundation/policy/conventions — .roboco/conventions.yml parse "claude-agent-sdk>=0.2.105", ] @@ -72,6 +73,7 @@ dev = [ # Type Stubs "types-passlib", "types-python-jose", + "types-PyYAML", # Development "ipython", diff --git a/roboco/foundation/policy/conventions/__init__.py b/roboco/foundation/policy/conventions/__init__.py new file mode 100644 index 00000000..2bda03c3 --- /dev/null +++ b/roboco/foundation/policy/conventions/__init__.py @@ -0,0 +1,33 @@ +"""Architectural-conventions standard: schema models + effective-map merge. + +Pure foundation layer (no IO/DB). The validator CLI (``roboco.conventions``), +``ConventionsService``, and the gateway gates all build on these types. +""" + +from __future__ import annotations + +from .effective_map import effective_map +from .models import ( + BUILTIN_RULES, + ConventionsParseError, + ConventionsStandard, + CustomRule, + DefinitionKind, + Module, + Rule, + RuleLevel, + Waiver, +) + +__all__ = [ + "BUILTIN_RULES", + "ConventionsParseError", + "ConventionsStandard", + "CustomRule", + "DefinitionKind", + "Module", + "Rule", + "RuleLevel", + "Waiver", + "effective_map", +] diff --git a/roboco/foundation/policy/conventions/effective_map.py b/roboco/foundation/policy/conventions/effective_map.py new file mode 100644 index 00000000..a2cef4a1 --- /dev/null +++ b/roboco/foundation/policy/conventions/effective_map.py @@ -0,0 +1,65 @@ +"""Effective-map merge: auto-derived defaults overlaid by the committed file. + +Every consumer (validator, ambient injection, baseline constraints) reads the +*effective* map, so behaviour is identical whether the file is present, +absent, or partial. Precedence, per field: + +- ``rules``: ``BUILTIN_RULES`` < derived < file (per key). +- ``modules``: derived, with file modules overriding by ``path`` (and new + paths appended in file order). +- ``custom`` / ``waivers`` / ``version``: the file's when a file is present, + else the derived value (the file is the curated replacement). +- ``languages``: union (derived order, then file-only extras). + +Pure: no IO, no DB. +""" + +from __future__ import annotations + +from .models import BUILTIN_RULES, ConventionsStandard, Module, Rule + + +def _merge_rules( + derived: ConventionsStandard, file: ConventionsStandard | None +) -> dict[str, Rule]: + merged: dict[str, Rule] = { + name: Rule(name=name, level=level) for name, level in BUILTIN_RULES.items() + } + merged.update(derived.rules) + if file is not None: + merged.update(file.rules) + return merged + + +def _merge_modules( + derived: ConventionsStandard, file: ConventionsStandard | None +) -> list[Module]: + modules = {m.path: m for m in derived.modules} + for m in file.modules if file is not None else []: + modules[m.path] = m + return list(modules.values()) + + +def _union_languages( + derived: ConventionsStandard, file: ConventionsStandard | None +) -> list[str]: + languages = list(derived.languages) + for lang in file.languages if file is not None else []: + if lang not in languages: + languages.append(lang) + return languages + + +def effective_map( + derived: ConventionsStandard, file: ConventionsStandard | None +) -> ConventionsStandard: + """Merge auto-derived defaults with the committed file into one standard.""" + curated = file if file is not None else derived + return ConventionsStandard( + version=curated.version, + languages=_union_languages(derived, file), + modules=_merge_modules(derived, file), + rules=_merge_rules(derived, file), + custom=curated.custom, + waivers=curated.waivers, + ) diff --git a/roboco/foundation/policy/conventions/models.py b/roboco/foundation/policy/conventions/models.py new file mode 100644 index 00000000..5d295bfa --- /dev/null +++ b/roboco/foundation/policy/conventions/models.py @@ -0,0 +1,129 @@ +"""Architectural-conventions standard — schema models + YAML parse. + +The standard is the repo-canonical ``.roboco/conventions.yml``: a per-project +architecture map (which definition *kinds* belong in which modules), a +toggleable rule set, custom regex rules, and waivers. These models are pure +(no IO, no DB) — the validator, the service, and the effective-map merge all +build on them. ``parse_yaml`` is the single entry point from raw file text to +a validated ``ConventionsStandard`` (or a ``ConventionsParseError``). +""" + +from __future__ import annotations + +from typing import Any, Literal + +import yaml +from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_validator + +RuleLevel = Literal["warn", "block"] +DefinitionKind = Literal[ + "model", "route", "helper", "business_logic", "component", "other" +] + + +class ConventionsParseError(ValueError): + """Raised when ``.roboco/conventions.yml`` is malformed or invalid.""" + + def __init__(self, reason: str) -> None: + super().__init__(reason) + self.reason = reason + + +# The org-default rule set: applied to every project's effective map before the +# committed file or auto-derived rules overlay it. Keep in sync with the +# validator's rule emitters and the panel's rule list. +BUILTIN_RULES: dict[str, RuleLevel] = { + "no_models_in_routers": "block", + "no_helpers_in_routers": "block", + "no_lint_suppressions": "block", + "no_inline_comments": "warn", +} + + +class _Base(BaseModel): + """Shared config: ignore unknown keys for forward-compatibility.""" + + model_config = ConfigDict(extra="ignore") + + +class Module(_Base): + """One module boundary: a path prefix, its purpose, forbidden def kinds.""" + + path: str + purpose: str + forbidden: list[DefinitionKind] = Field(default_factory=list) + + +class Rule(_Base): + """A toggleable rule — its name and the level it fires at.""" + + name: str + level: RuleLevel + + +class CustomRule(_Base): + """A project-specific regex rule, optionally scoped to languages.""" + + id: str + pattern: str + message: str + level: RuleLevel + languages: list[str] = Field(default_factory=list) + + +class Waiver(_Base): + """An accountable escape hatch: a (path, rule) the gate must not flag.""" + + path: str + rule: str + reason: str + + +class ConventionsStandard(_Base): + """The parsed standard (raw file *or* the merged effective map).""" + + version: int = 1 + languages: list[str] = Field(default_factory=list) + modules: list[Module] = Field(default_factory=list) + rules: dict[str, Rule] = Field(default_factory=dict) + custom: list[CustomRule] = Field(default_factory=list) + waivers: list[Waiver] = Field(default_factory=list) + + @field_validator("rules", mode="before") + @classmethod + def _name_rules_from_keys(cls, v: Any) -> Any: + """Inject the mapping key as each rule's ``name``. + + The YAML keys rules by name with a ``{level: ...}`` value; the model + carries the name on the rule itself. Accept either shape so a ``Rule`` + constructed directly also passes through unchanged. + """ + if not isinstance(v, dict): + return v + out: dict[str, Any] = {} + for name, spec in v.items(): + if isinstance(spec, dict): + out[name] = {"name": name, **spec} + else: + out[name] = spec + return out + + @classmethod + def parse_yaml(cls, text: str) -> ConventionsStandard: + """Parse ``.roboco/conventions.yml`` text into a validated standard. + + Raises ``ConventionsParseError`` on malformed YAML, a non-mapping + top level, or any schema violation (e.g. an unknown rule level). + """ + try: + data = yaml.safe_load(text) + except yaml.YAMLError as exc: + raise ConventionsParseError(f"malformed YAML: {exc}") from exc + if data is None: + return cls() + if not isinstance(data, dict): + raise ConventionsParseError("top-level conventions must be a mapping") + try: + return cls.model_validate(data) + except ValidationError as exc: + raise ConventionsParseError(str(exc)) from exc diff --git a/tests/unit/foundation/policy/conventions/test_conventions_models.py b/tests/unit/foundation/policy/conventions/test_conventions_models.py new file mode 100644 index 00000000..4bfef2b0 --- /dev/null +++ b/tests/unit/foundation/policy/conventions/test_conventions_models.py @@ -0,0 +1,99 @@ +"""Schema-model + YAML-parse tests for the architectural-conventions standard.""" + +from __future__ import annotations + +import pytest +from roboco.foundation.policy.conventions.models import ( + BUILTIN_RULES, + ConventionsParseError, + ConventionsStandard, + CustomRule, + Module, + Rule, + Waiver, +) + +_VALID_YAML = """ +version: 1 +languages: [python, typescript] +modules: + - path: app/routers + purpose: HTTP routes + forbidden: [model, helper] + - path: app/models + purpose: Pydantic / ORM models +rules: + no_models_in_routers: { level: block } + no_inline_comments: { level: warn } +custom: + - id: no-print + pattern: '\\bprint\\(' + message: use the logger + level: warn + languages: [python] +waivers: + - path: app/routers/legacy.py + rule: no_models_in_routers + reason: extraction tracked separately +""" + + +def test_valid_yaml_parses_to_standard() -> None: + std = ConventionsStandard.parse_yaml(_VALID_YAML) + assert std.version == 1 + assert std.languages == ["python", "typescript"] + assert std.modules[0].path == "app/routers" + assert std.modules[0].forbidden == ["model", "helper"] + assert std.rules["no_models_in_routers"].level == "block" + assert std.rules["no_models_in_routers"].name == "no_models_in_routers" + assert std.custom[0].id == "no-print" + assert std.custom[0].languages == ["python"] + assert std.waivers[0].rule == "no_models_in_routers" + + +def test_empty_yaml_yields_default_standard() -> None: + std = ConventionsStandard.parse_yaml("") + assert std == ConventionsStandard() + assert std.version == 1 + + +def test_unknown_rule_level_raises_parse_error() -> None: + with pytest.raises(ConventionsParseError): + ConventionsStandard.parse_yaml( + "rules:\n no_models_in_routers: { level: explode }\n" + ) + + +def test_malformed_yaml_raises_parse_error() -> None: + with pytest.raises(ConventionsParseError): + ConventionsStandard.parse_yaml("modules: [unterminated\n") + + +def test_non_mapping_top_level_raises_parse_error() -> None: + with pytest.raises(ConventionsParseError): + ConventionsStandard.parse_yaml("- just\n- a\n- list\n") + + +def test_unknown_definition_kind_in_forbidden_raises() -> None: + with pytest.raises(ConventionsParseError): + ConventionsStandard.parse_yaml( + "modules:\n - path: x\n purpose: y\n forbidden: [wizard]\n" + ) + + +def test_builtin_rules_cover_the_org_defaults() -> None: + assert BUILTIN_RULES["no_models_in_routers"] == "block" + assert BUILTIN_RULES["no_helpers_in_routers"] == "block" + assert BUILTIN_RULES["no_lint_suppressions"] == "block" + assert BUILTIN_RULES["no_inline_comments"] == "warn" + + +def test_models_construct_directly() -> None: + mod = Module(path="app/services", purpose="logic", forbidden=["route"]) + assert mod.forbidden == ["route"] + rule = Rule(name="no_print", level="warn") + assert rule.level == "warn" + custom = CustomRule(id="x", pattern="y", message="z", level="block") + assert custom.languages == [] + waiver = Waiver(path="a.py", rule="no_models_in_routers", reason="r") + assert waiver.path == "a.py" diff --git a/tests/unit/foundation/policy/conventions/test_effective_map.py b/tests/unit/foundation/policy/conventions/test_effective_map.py new file mode 100644 index 00000000..b27369be --- /dev/null +++ b/tests/unit/foundation/policy/conventions/test_effective_map.py @@ -0,0 +1,89 @@ +"""Effective-map merge tests: auto-derived defaults overlaid by the file.""" + +from __future__ import annotations + +from roboco.foundation.policy.conventions.effective_map import effective_map +from roboco.foundation.policy.conventions.models import ( + ConventionsStandard, + CustomRule, + Module, + Rule, + Waiver, +) + + +def test_effective_map_applies_builtin_rules_when_file_absent() -> None: + eff = effective_map(ConventionsStandard(), None) + assert eff.rules["no_models_in_routers"].level == "block" + assert eff.rules["no_inline_comments"].level == "warn" + + +def test_file_module_overrides_derived_by_path() -> None: + derived = ConventionsStandard( + modules=[Module(path="app/routers", purpose="routes")] + ) + file = ConventionsStandard( + modules=[Module(path="app/routers", purpose="routes", forbidden=["model"])] + ) + eff = effective_map(derived, file) + assert len(eff.modules) == 1 + assert eff.modules[0].forbidden == ["model"] + + +def test_file_module_appends_new_path() -> None: + derived = ConventionsStandard( + modules=[Module(path="app/routers", purpose="routes")] + ) + file = ConventionsStandard(modules=[Module(path="app/models", purpose="models")]) + eff = effective_map(derived, file) + assert [m.path for m in eff.modules] == ["app/routers", "app/models"] + + +def test_file_rule_overrides_builtin_level() -> None: + file = ConventionsStandard( + rules={"no_inline_comments": Rule(name="no_inline_comments", level="block")} + ) + eff = effective_map(ConventionsStandard(), file) + assert eff.rules["no_inline_comments"].level == "block" + + +def test_derived_rule_overrides_builtin_then_file_overrides_derived() -> None: + derived = ConventionsStandard( + rules={"no_inline_comments": Rule(name="no_inline_comments", level="block")} + ) + eff_no_file = effective_map(derived, None) + assert eff_no_file.rules["no_inline_comments"].level == "block" + file = ConventionsStandard( + rules={"no_inline_comments": Rule(name="no_inline_comments", level="warn")} + ) + eff = effective_map(derived, file) + assert eff.rules["no_inline_comments"].level == "warn" + + +def test_languages_are_unioned() -> None: + derived = ConventionsStandard(languages=["python"]) + file = ConventionsStandard(languages=["python", "typescript"]) + eff = effective_map(derived, file) + assert eff.languages == ["python", "typescript"] + + +def test_file_custom_and_waivers_replace_derived() -> None: + derived = ConventionsStandard( + custom=[CustomRule(id="d", pattern="d", message="d", level="warn")], + waivers=[Waiver(path="d.py", rule="no_models_in_routers", reason="d")], + ) + file = ConventionsStandard( + custom=[CustomRule(id="f", pattern="f", message="f", level="block")], + waivers=[Waiver(path="f.py", rule="no_helpers_in_routers", reason="f")], + ) + eff = effective_map(derived, file) + assert [c.id for c in eff.custom] == ["f"] + assert [w.path for w in eff.waivers] == ["f.py"] + + +def test_file_none_keeps_derived_custom_and_waivers() -> None: + derived = ConventionsStandard( + custom=[CustomRule(id="d", pattern="d", message="d", level="warn")], + ) + eff = effective_map(derived, None) + assert [c.id for c in eff.custom] == ["d"] diff --git a/uv.lock b/uv.lock index 636431c4..76aa9d72 100644 --- a/uv.lock +++ b/uv.lock @@ -3003,6 +3003,7 @@ dependencies = [ { name = "python-jose", extra = ["cryptography"] }, { name = "python-multipart" }, { name = "python-toon" }, + { name = "pyyaml" }, { name = "redis" }, { name = "sqlalchemy", extra = ["asyncio"] }, { name = "sse-starlette" }, @@ -3035,6 +3036,7 @@ dev = [ { name = "ruff" }, { name = "types-passlib" }, { name = "types-python-jose" }, + { name = "types-pyyaml" }, { name = "vulture" }, { name = "xenon" }, ] @@ -3086,6 +3088,7 @@ requires-dist = [ { name = "python-jose", extras = ["cryptography"] }, { name = "python-multipart" }, { name = "python-toon" }, + { name = "pyyaml" }, { name = "radon", marker = "extra == 'dev'" }, { name = "redis" }, { name = "rich", marker = "extra == 'dev'" }, @@ -3098,6 +3101,7 @@ requires-dist = [ { name = "tomli-w" }, { name = "types-passlib", marker = "extra == 'dev'" }, { name = "types-python-jose", marker = "extra == 'dev'" }, + { name = "types-pyyaml", marker = "extra == 'dev'" }, { name = "uvicorn", extras = ["standard"] }, { name = "vulture", marker = "extra == 'dev'" }, { name = "websockets" }, @@ -3721,6 +3725,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d3/83/df2b34e64f0a674935d718471cf10fb392a7e5bdb0e9e7c739885b62d274/types_python_jose-3.5.0.20260408-py3-none-any.whl", hash = "sha256:968d8a8eac1ff9da249d6335a2bb9f82288d59ba23afe91fcc2662eb9f485e2a", size = 14694, upload-time = "2026-04-08T04:34:09.747Z" }, ] +[[package]] +name = "types-pyyaml" +version = "6.0.12.20260518" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b8/83/4a1afc3fbfcf5b8d46fc390cd95ed6b0dc9010a265f4e9f46314efffa37a/types_pyyaml-6.0.12.20260518.tar.gz", hash = "sha256:d917f83fb38462550338c1297faedd860b3ec83912b96b1e3d73255f7473e466", size = 17850, upload-time = "2026-05-18T06:01:58.675Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/06/a2/c01db32be2ae7d6a1689972f3c492b149ee4e164b12fdfd9f64b50888215/types_pyyaml-6.0.12.20260518-py3-none-any.whl", hash = "sha256:d2150f75a231c9fe9c7463bd29487d93e60bac90400287351384bc2284eba7cd", size = 20312, upload-time = "2026-05-18T06:01:57.368Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0" From 6ff80f853621da074f1945e3c037b3fdb2f3b443 Mon Sep 17 00:00:00 2001 From: Renn F Date: Mon, 22 Jun 2026 03:45:22 +0200 Subject: [PATCH 2/2] feat(orchestrator): park provider on persistent server overload (529/500) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A 429 rate limit already parks a provider — queue its spawns, probe until it recovers — but a persistent 529/500/503 overload had no such break: the run died and the orchestrator crash-retried straight back into the overload, burning tokens in a respawn loop. Generalize the park to provider-unavailability. On a non-graceful Anthropic agent exit, match the API's overload markers (overloaded_error / internal_server_error / "API Error: 5xx") against the dead container's own output and park the provider with kind="overloaded"; the existing spawn gate already queues any parked provider, and the probe-resume loop revives the task when it recovers. Grok keeps its exit-75 path; both now route through one _park_provider_unavailable helper. Markers are kept specific so an agent that merely writes about HTTP 500/529 can't trip the break. Fix the recovery probe to require a 2xx: it treated any non-429 as recovered, so a probe that itself got a 529 would have resumed agents straight back into the overload — wrong for the new path and for a 429 that lifts into a 5xx. Gated by ROBOCO_OVERLOAD_BREAK_ENABLED (default on; off => crash-retry). --- roboco/config.py | 10 + roboco/runtime/orchestrator.py | 170 ++++++++++++--- roboco/services/gateway/rate_limit_tracker.py | 8 +- tests/unit/runtime/test_grok_rate_limit.py | 10 +- .../runtime/test_provider_overload_break.py | 194 ++++++++++++++++++ tests/unit/runtime/test_rate_limit_probe.py | 18 +- .../unit/services/test_rate_limit_tracker.py | 15 ++ 7 files changed, 396 insertions(+), 29 deletions(-) create mode 100644 tests/unit/runtime/test_provider_overload_break.py diff --git a/roboco/config.py b/roboco/config.py index 11f3beda..cc1731fb 100644 --- a/roboco/config.py +++ b/roboco/config.py @@ -191,6 +191,16 @@ def rag_store_url(self) -> str: ), ) + overload_break_enabled: bool = Field( + default=True, + description=( + "Park a provider on a persistent server overload (HTTP 529 / 500 / " + "503 from the model API) the same way a 429 rate limit is parked: " + "queue that provider's spawns and probe until it recovers, instead " + "of crash-retrying into the overload. Off => crash-retry behavior." + ), + ) + # ========================================================================== # Web Research (pluggable external search/fetch for Board + PM roles) # ========================================================================== diff --git a/roboco/runtime/orchestrator.py b/roboco/runtime/orchestrator.py index bd8ad378..62476d01 100644 --- a/roboco/runtime/orchestrator.py +++ b/roboco/runtime/orchestrator.py @@ -79,15 +79,37 @@ # _sweep_budget_exceeded) to build the SDK health/usage URL. SDK_PORT: int = 9000 -# Rate-limit recovery probe: a free, unmetered liveness call confirms a -# provider has stopped rate-limiting us before parked agents are resumed. -# Listing models / tags costs no tokens; a non-429 response means lifted. +# Provider-recovery probe: a free, unmetered liveness call confirms a parked +# provider is accepting requests again before parked agents are resumed. +# Listing models / tags costs no tokens; only a 2xx response means recovered +# (a 429 rate limit OR a 5xx overload both keep the provider parked). _ANTHROPIC_PROBE_BASE = "https://api.anthropic.com" _PROBE_TIMEOUT_SECONDS = 10.0 _HTTP_TOO_MANY_REQUESTS = 429 +_HTTP_OK = 200 +_HTTP_MULTIPLE_CHOICES = 300 # first non-2xx status; 2xx == [_HTTP_OK, this) # Consecutive failed recovery probes before the CEO is notified once per episode. _CEO_NOTIFY_THRESHOLD = 10 +# Persistent server-overload parking (HTTP 529 / 500 / 503). The model API's +# SDK already retries transient overloads in-process; only a persistent one +# survives to kill the run. When it does, park the provider like a 429 instead +# of crash-retrying into the overload. These markers are matched (lowercased, +# substring) against the tail of the dead container's own output, so they are +# kept specific to how the API surfaces an overload — bare "500"/"529" would +# false-match an agent that merely writes about HTTP status codes. +_OVERLOAD_RETRY_AFTER_S = 45.0 +_ANTHROPIC_OVERLOAD_MARKERS: tuple[str, ...] = ( + "overloaded_error", + "internal_server_error", + "api error: 529", + "api error: 500", + "api error: 503", + "error 529", + "error 500", + "error 503", +) + # The intake (prompter) agent: a single seeded, board-adjacent interviewer. # Unlike delivery agents it is never dispatched and runs ONE persistent # container at a time (single CEO → one live chat). See the INTAKE section @@ -4950,6 +4972,29 @@ async def _handle_stopped_container( await self._park_grok_rate_limited(agent_id, instance) return graceful = exit_code == 0 + # Server-overload parking: a persistent 529/500/503 from the model API + # kills the run (the SDK already retries transient ones). Detect the + # overload marker in the dead container's output and park the provider — + # the same break as a 429 — instead of crash-retrying into the overload. + if not graceful: + overloaded_provider = await self._provider_overload_park_target( + agent_id, instance + ) + if overloaded_provider is not None: + logger.warning( + "Provider overload detected in agent output; parking provider", + agent_id=agent_id, + provider=overloaded_provider, + task_id=instance.current_task_id, + ) + await self._park_provider_unavailable( + agent_id, + instance, + provider=overloaded_provider, + retry_after=_OVERLOAD_RETRY_AFTER_S, + kind="overloaded", + ) + return if graceful: logger.info( "Agent container exited gracefully", @@ -5527,34 +5572,107 @@ def _is_grok_rate_limit_exit(instance: Any, exit_code: int | None) -> bool: and instance.config.provider_type == ModelProvider.GROK.value ) - async def _park_grok_rate_limited(self, agent_id: str, instance: Any) -> None: - """Park a grok agent whose run hit an xAI 429 (entrypoint exit 75). + @staticmethod + async def _tail_container_logs(container_name: str, lines: int = 80) -> str: + """Return the last ``lines`` of a container's combined output, '' on error. - Finalize the session for usage capture, mark the instance OFFLINE - WITHOUT counting a crash (so it isn't escalated as stranded), and - activate the grok rate-limit tracker so the spawn guard suppresses - re-spawns until the probe-resume loop clears it after the retry window. - The task stays claimed/in_progress and is retried when the limit lifts. + The container is still present at exit (agents run detached, not + ``--rm``), so ``docker logs`` can read what the dead run printed. """ + try: + proc = await asyncio.create_subprocess_exec( + "docker", + "logs", + "--tail", + str(lines), + container_name, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + out, _ = await proc.communicate() + except Exception: + return "" + return out.decode(errors="replace") + + async def _provider_overload_park_target( + self, agent_id: str, instance: Any + ) -> str | None: + """Provider to park if this dead run hit a persistent overload, else None. + + Only the Anthropic path is matched: grok has its own exit-75 detector, + and other providers surface overloads differently. Returns None when + the feature is disabled, the agent isn't Anthropic, or the output holds + no overload marker. Gated so a misfire can be turned off without a + redeploy of the detection logic. + """ + if not settings.overload_break_enabled: + return None from roboco.models.base import ModelProvider - await self._finalize_spawn_session(agent_id, exit_reason="rate_limited") + provider_type = instance.config.provider_type if instance.config else None + if provider_type not in (None, ModelProvider.ANTHROPIC.value): + return None + tail = await self._tail_container_logs(f"roboco-agent-{agent_id}") + lowered = tail.lower() + if any(marker in lowered for marker in _ANTHROPIC_OVERLOAD_MARKERS): + return ModelProvider.ANTHROPIC.value + return None + + async def _park_provider_unavailable( + self, + agent_id: str, + instance: Any, + *, + provider: str, + retry_after: float, + kind: str, + ) -> None: + """Park an agent whose run ended because its provider is unavailable. + + Covers both a 429 rate limit and a persistent 5xx overload. Finalize + the session for usage capture, mark the instance OFFLINE WITHOUT + counting a crash (so it isn't escalated as stranded), and activate the + provider's tracker so the spawn guard suppresses re-spawns until the + probe-resume loop clears it. The task stays claimed/in_progress and is + retried when the provider recovers. + """ + await self._finalize_spawn_session(agent_id, exit_reason=kind) instance.state = AgentState.OFFLINE instance.container_id = None - instance.error_count = 0 # a 429 is not a crash — don't escalate as stranded + instance.error_count = 0 # provider unavailability is not a crash try: - await self._make_tracker(ModelProvider.GROK.value).activate( - retry_after=_GROK_RATE_LIMIT_RETRY_AFTER_S, + await self._make_tracker(provider).activate( + retry_after=retry_after, affected_agents=[agent_id], + kind=kind, ) except Exception as exc: - logger.warning("failed to park grok rate-limit state", error=str(exc)) + logger.warning( + "failed to park provider-unavailable state", + provider=provider, + kind=kind, + error=str(exc), + ) logger.warning( - "Grok provider rate-limited; parked (task retried when the limit lifts)", + "Provider unavailable; parked (task retried when it recovers)", + provider=provider, + kind=kind, agent_id=agent_id, task_id=instance.current_task_id, ) + async def _park_grok_rate_limited(self, agent_id: str, instance: Any) -> None: + """Park a grok agent whose run hit an xAI 429 (entrypoint exit 75).""" + from roboco.models.base import ModelProvider + + await self._park_provider_unavailable( + agent_id, + instance, + provider=ModelProvider.GROK.value, + retry_after=_GROK_RATE_LIMIT_RETRY_AFTER_S, + kind="rate_limited", + ) + @staticmethod def _too_early_to_probe(state: dict[str, Any]) -> bool: """True while the estimated lift time (activated_at + retry_after) is future. @@ -5668,14 +5786,16 @@ def _probe_target(provider: str) -> tuple[str | None, dict[str, str]]: return None, {} async def _do_probe(self, provider: str) -> bool: - """Return True if ``provider`` is accepting requests again (not 429). + """Return True if ``provider`` is accepting requests again. Makes a free, unmetered liveness call — Anthropic ``GET /v1/models`` - or Ollama ``GET /api/tags`` — and treats any non-429 response as the - rate limit having lifted. A 429 keeps the provider parked; a network - error stays parked too (retry next sweep). When the provider can't be - probed (no key / unknown), fall back to time-expiry optimism: the - caller only reaches this after ``estimated_lift_at`` has passed. + or Ollama ``GET /api/tags`` — and treats only a 2xx response as + recovered. Any error status keeps the provider parked: a 429 (still + rate-limited) **and** a 5xx (still overloaded) alike — resuming on a + non-2xx would march parked agents straight back into the failure. A + network error stays parked too (retry next sweep). When the provider + can't be probed (no key / unknown), fall back to time-expiry optimism: + the caller only reaches this after ``estimated_lift_at`` has passed. Injectable boundary — tests monkeypatch this to force outcomes. """ @@ -5687,10 +5807,12 @@ async def _do_probe(self, provider: str) -> bool: resp = await client.get(url, headers=headers) except httpx.HTTPError as exc: logger.debug( - "Rate-limit probe request failed", provider=provider, error=str(exc) + "Provider-recovery probe request failed", + provider=provider, + error=str(exc), ) return False # unreachable — stay parked, retry on the next sweep - return resp.status_code != _HTTP_TOO_MANY_REQUESTS + return _HTTP_OK <= resp.status_code < _HTTP_MULTIPLE_CHOICES async def _notify_rate_limit_ceo( self, diff --git a/roboco/services/gateway/rate_limit_tracker.py b/roboco/services/gateway/rate_limit_tracker.py index 85b69c29..44384b92 100644 --- a/roboco/services/gateway/rate_limit_tracker.py +++ b/roboco/services/gateway/rate_limit_tracker.py @@ -68,18 +68,24 @@ async def activate( self, retry_after: float | None = None, affected_agents: list[str] | None = None, + kind: str = "rate_limited", ) -> None: - """Mark the provider as rate-limited. + """Mark the provider as unavailable so new spawns are queued. Args: retry_after: Seconds until the provider should accept new requests, or ``None`` if unknown. affected_agents: Agent slugs that were active when the limit was hit (informational; stored in state). + kind: Why the provider is parked — ``"rate_limited"`` + (a 429) or ``"overloaded"`` (a persistent 5xx). + Both gate spawns identically; the kind is stored + so the panel / notifications can distinguish them. """ r = await self._conn() state: dict[str, Any] = { "rate_limited": True, + "kind": kind, "activated_at": datetime.now(UTC).isoformat(), "retry_after": retry_after, "affected_agents": affected_agents or [], diff --git a/tests/unit/runtime/test_grok_rate_limit.py b/tests/unit/runtime/test_grok_rate_limit.py index 663fd184..5fd5c711 100644 --- a/tests/unit/runtime/test_grok_rate_limit.py +++ b/tests/unit/runtime/test_grok_rate_limit.py @@ -35,10 +35,17 @@ def __init__(self, *, limited: bool = False) -> None: async def is_rate_limited(self) -> bool: return self._limited - async def activate(self, *, retry_after: float, affected_agents: list[str]) -> None: + async def activate( + self, + *, + retry_after: float, + affected_agents: list[str], + kind: str = "rate_limited", + ) -> None: self.activated_with = { "retry_after": retry_after, "affected_agents": affected_agents, + "kind": kind, } @@ -115,6 +122,7 @@ async def test_park_grok_rate_limited_activates_and_offlines( assert tracker.activated_with == { "retry_after": pytest.approx(60.0), "affected_agents": ["be-dev-1"], + "kind": "rate_limited", } diff --git a/tests/unit/runtime/test_provider_overload_break.py b/tests/unit/runtime/test_provider_overload_break.py new file mode 100644 index 00000000..f4a7191f --- /dev/null +++ b/tests/unit/runtime/test_provider_overload_break.py @@ -0,0 +1,194 @@ +"""Server-overload parking: break the 529/500 -> crash -> respawn cost loop. + +A persistent overload (HTTP 529 / 500 / 503) from the model API kills the run; +the orchestrator parks the provider — the same break as a 429 rate limit — +instead of crash-retrying straight back into the overload. The probe-resume +loop revives the task when the provider recovers. These tests exercise the +decision points deterministically (logs + tracker + finalize stubbed). +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from roboco.config import settings +from roboco.models.runtime import AgentInstance +from roboco.runtime.orchestrator import ( + _OVERLOAD_RETRY_AFTER_S, + AgentOrchestrator, + AgentState, +) + +_OVERLOAD_LOG = ( + 'API Error: 529 {"type":"error","error":{"type":"overloaded_error",' + '"message":"Overloaded"}}' +) +_CLEAN_LOG = "be-dev-1 finished editing src/app.py; all checks passed" + + +def _instance(provider_type: str | None = "anthropic") -> AgentInstance: + cfg = type( + "C", + (), + {"provider_type": provider_type, "model": "claude-x", "git_context": None}, + )() + inst = AgentInstance(agent_id="be-dev-1", state=AgentState.ACTIVE, config=cfg) + inst.current_task_id = "task-1" + inst.container_id = "cid" + return inst + + +@pytest.fixture +def orch() -> AgentOrchestrator: + return AgentOrchestrator.__new__(AgentOrchestrator) + + +class _FakeTracker: + def __init__(self) -> None: + self.activated_with: dict[str, object] | None = None + + async def activate( + self, *, retry_after: float, affected_agents: list[str], kind: str + ) -> None: + self.activated_with = { + "retry_after": retry_after, + "affected_agents": affected_agents, + "kind": kind, + } + + +# --------------------------------------------------------------------------- +# _provider_overload_park_target — the detection decision +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_detects_overload_marker_for_anthropic( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(settings, "overload_break_enabled", True) + monkeypatch.setattr( + orch, "_tail_container_logs", AsyncMock(return_value=_OVERLOAD_LOG) + ) + assert ( + await orch._provider_overload_park_target("be-dev-1", _instance()) + == "anthropic" + ) + + +@pytest.mark.asyncio +async def test_clean_output_is_not_overload( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(settings, "overload_break_enabled", True) + monkeypatch.setattr( + orch, "_tail_container_logs", AsyncMock(return_value=_CLEAN_LOG) + ) + assert await orch._provider_overload_park_target("be-dev-1", _instance()) is None + + +@pytest.mark.asyncio +async def test_disabled_flag_never_parks( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(settings, "overload_break_enabled", False) + tail = AsyncMock(return_value=_OVERLOAD_LOG) + monkeypatch.setattr(orch, "_tail_container_logs", tail) + assert await orch._provider_overload_park_target("be-dev-1", _instance()) is None + tail.assert_not_awaited() # short-circuits before reading logs + + +@pytest.mark.asyncio +async def test_grok_provider_is_skipped( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + """Grok has its own exit-75 detector; the log-marker path ignores it.""" + monkeypatch.setattr(settings, "overload_break_enabled", True) + monkeypatch.setattr( + orch, "_tail_container_logs", AsyncMock(return_value=_OVERLOAD_LOG) + ) + assert ( + await orch._provider_overload_park_target("gk-dev-1", _instance("grok")) is None + ) + + +# --------------------------------------------------------------------------- +# _park_provider_unavailable — the park action +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_park_offlines_and_activates_with_kind( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + inst = _instance() + inst.error_count = 2 # prior crashes — parking must NOT count one + tracker = _FakeTracker() + monkeypatch.setattr(orch, "_make_tracker", lambda _p: tracker) + monkeypatch.setattr(orch, "_finalize_spawn_session", AsyncMock()) + + await orch._park_provider_unavailable( + "be-dev-1", inst, provider="anthropic", retry_after=45.0, kind="overloaded" + ) + + assert inst.state == AgentState.OFFLINE + assert inst.container_id is None + assert inst.error_count == 0 + assert tracker.activated_with == { + "retry_after": pytest.approx(45.0), + "affected_agents": ["be-dev-1"], + "kind": "overloaded", + } + + +# --------------------------------------------------------------------------- +# _handle_stopped_container — overload short-circuits the crash-retry path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_stopped_container_parks_on_overload( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + inst = _instance() + park = AsyncMock() + spawn = AsyncMock() + monkeypatch.setattr(orch, "_is_grok_rate_limit_exit", lambda _i, _e: False) + monkeypatch.setattr( + orch, "_provider_overload_park_target", AsyncMock(return_value="anthropic") + ) + monkeypatch.setattr(orch, "_park_provider_unavailable", park) + monkeypatch.setattr(orch, "_finalize_spawn_session", AsyncMock()) + monkeypatch.setattr(orch, "spawn_agent", spawn) + + await orch._handle_stopped_container("be-dev-1", inst, exit_code=1) + + park.assert_awaited_once_with( + "be-dev-1", + inst, + provider="anthropic", + retry_after=_OVERLOAD_RETRY_AFTER_S, + kind="overloaded", + ) + spawn.assert_not_awaited() # the crash-retry path is short-circuited + + +@pytest.mark.asyncio +async def test_stopped_container_crash_retries_when_not_overload( + orch: AgentOrchestrator, monkeypatch: pytest.MonkeyPatch +) -> None: + inst = _instance() + inst.error_count = 0 + spawn = AsyncMock() + monkeypatch.setattr(orch, "_is_grok_rate_limit_exit", lambda _i, _e: False) + monkeypatch.setattr( + orch, "_provider_overload_park_target", AsyncMock(return_value=None) + ) + monkeypatch.setattr(orch, "_finalize_spawn_session", AsyncMock()) + monkeypatch.setattr(orch, "spawn_agent", spawn) + + await orch._handle_stopped_container("be-dev-1", inst, exit_code=1) + + # Not an overload → the normal crash-retry path runs. + spawn.assert_awaited_once() diff --git a/tests/unit/runtime/test_rate_limit_probe.py b/tests/unit/runtime/test_rate_limit_probe.py index 6fb796fe..71d53d42 100644 --- a/tests/unit/runtime/test_rate_limit_probe.py +++ b/tests/unit/runtime/test_rate_limit_probe.py @@ -2,9 +2,10 @@ ``_do_probe`` replaced a time-based stub that always returned True. It now makes a free, unmetered call (Anthropic ``GET /v1/models`` / Ollama -``GET /api/tags``) and treats any non-429 response as the rate limit having -lifted. These tests pin that contract: target resolution per provider, the -429-vs-not decision, network-error → stay-parked, and the un-probeable +``GET /api/tags``) and treats only a 2xx response as the provider having +recovered — a 429 (rate limit) and a 5xx (overload) both keep it parked. +These tests pin that contract: target resolution per provider, the +2xx-vs-error decision, network-error → stay-parked, and the un-probeable fallback to time-expiry optimism. """ @@ -111,6 +112,17 @@ async def test_probe_anthropic_429_stays_limited(orch: AgentOrchestrator) -> Non assert await orch._do_probe("anthropic") is False +@pytest.mark.usefixtures("with_anthropic_key") +@pytest.mark.parametrize("status", [500, 503, 529]) +async def test_probe_anthropic_5xx_stays_parked( + orch: AgentOrchestrator, status: int +) -> None: + """A 5xx (overload) keeps the provider parked — resuming would re-overload it.""" + fake = _fake_async_client(status_code=status) + with patch("roboco.runtime.orchestrator.httpx.AsyncClient", fake): + assert await orch._do_probe("anthropic") is False + + @pytest.mark.usefixtures("with_anthropic_key") async def test_probe_network_error_stays_parked(orch: AgentOrchestrator) -> None: fake = _fake_async_client(raise_exc=httpx.ConnectError("boom")) diff --git a/tests/unit/services/test_rate_limit_tracker.py b/tests/unit/services/test_rate_limit_tracker.py index 83531cbf..330b0bb8 100644 --- a/tests/unit/services/test_rate_limit_tracker.py +++ b/tests/unit/services/test_rate_limit_tracker.py @@ -108,6 +108,21 @@ async def test_activate_initialises_probe_failures_zero(self) -> None: state = await tracker.get_state() assert state["probe_failures"] == 0 + async def test_activate_defaults_kind_to_rate_limited(self) -> None: + mock = _make_redis_mock() + tracker = _make_tracker(redis_mock=mock) + await tracker.activate() + state = await tracker.get_state() + assert state["kind"] == "rate_limited" + + async def test_activate_stores_overloaded_kind(self) -> None: + mock = _make_redis_mock() + tracker = _make_tracker(redis_mock=mock) + await tracker.activate(kind="overloaded") + state = await tracker.get_state() + assert state["kind"] == "overloaded" + assert await tracker.is_rate_limited() is True # gates spawns either way + async def test_clear_removes_state(self) -> None: mock = _make_redis_mock() tracker = _make_tracker(redis_mock=mock)