diff --git a/config/config.yaml b/config/config.yaml index 000a2ce..a259888 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,7 +7,7 @@ storage: collection_name: "incidents" distance_strategy: cosine llm: - default: gpt_oss + default: workhorse providers: ollama_cloud: kind: ollama @@ -28,7 +28,7 @@ llm: models: workhorse: provider: openrouter - model: openai/gpt-4o-mini + model: inclusionai/ring-2.6-1t:free temperature: 0.0 cheap: provider: ollama_cloud diff --git a/dist/app.py b/dist/app.py index 816fc0c..9d378d5 100644 --- a/dist/app.py +++ b/dist/app.py @@ -9573,6 +9573,26 @@ def finish(self, *, summary: str) -> None: "connection reset", ) +# 429 markers are kept separate from ``_TRANSIENT_MARKERS`` so the +# retry loop can apply a longer backoff. Free / shared upstream models +# (e.g. OpenRouter ``…:free`` tier) throttle on short windows that +# need 30-60s recovery — the 5xx default backoff (1.5s/3s/4.5s) is +# too aggressive and exhausts retries before the window clears. +# ``" 429"`` (with leading space) and ``"429 "`` cover bare-number +# error strings (e.g. "Provider returned 429"), with the spaces +# guarding against false positives on other 4-digit numbers like 1429. +_RATE_LIMIT_MARKERS = ( + "status code: 429", + "error code: 429", + " 429", + "429 ", + "ratelimiterror", + "rate limited", + "rate-limited", + "rate limit", + "too many requests", +) + async def _drive_agent_with_resume( *, @@ -9671,11 +9691,22 @@ async def _drive_agent_with_resume( async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, base_delay: float = 1.5, + rate_limit_base_delay: float = 7.5, config: dict | None = None): """Wrap a LangGraph agent invocation with retry on transient cloud errors. - Retries on common Ollama Cloud / streaming hiccups (500, status -1, etc.). - Non-transient exceptions (4xx, validation, etc.) propagate immediately. + Two backoff regimes: + + * 5xx / connection-reset / streaming hiccups → ``base_delay`` + (1.5s/3s/4.5s for 3 attempts) — these usually clear within a + few seconds. + * 429 rate-limit responses → ``rate_limit_base_delay`` (7.5s/15s + /22.5s) — free / shared upstream tiers (e.g. OpenRouter + ``…:free`` models) throttle on short windows that need + 30-60s to clear. + + Non-transient exceptions (4xx other than 429, validation, schema + drift, etc.) propagate immediately. """ last_exc: Exception | None = None for attempt in range(max_attempts): @@ -9705,11 +9736,13 @@ async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, raise except Exception as exc: # noqa: BLE001 msg = str(exc).lower() - transient = any(m in msg for m in _TRANSIENT_MARKERS) - if not transient or attempt == max_attempts - 1: + is_5xx = any(m in msg for m in _TRANSIENT_MARKERS) + is_429 = any(m in msg for m in _RATE_LIMIT_MARKERS) + if not (is_5xx or is_429) or attempt == max_attempts - 1: raise last_exc = exc - await asyncio.sleep(base_delay * (attempt + 1)) + delay = (rate_limit_base_delay if is_429 else base_delay) + await asyncio.sleep(delay * (attempt + 1)) raise last_exc or RuntimeError("retry exhausted with no attempts") # pragma: no cover diff --git a/dist/apps/code-review.py b/dist/apps/code-review.py index 8d6e5f3..343544f 100644 --- a/dist/apps/code-review.py +++ b/dist/apps/code-review.py @@ -9626,6 +9626,26 @@ def finish(self, *, summary: str) -> None: "connection reset", ) +# 429 markers are kept separate from ``_TRANSIENT_MARKERS`` so the +# retry loop can apply a longer backoff. Free / shared upstream models +# (e.g. OpenRouter ``…:free`` tier) throttle on short windows that +# need 30-60s recovery — the 5xx default backoff (1.5s/3s/4.5s) is +# too aggressive and exhausts retries before the window clears. +# ``" 429"`` (with leading space) and ``"429 "`` cover bare-number +# error strings (e.g. "Provider returned 429"), with the spaces +# guarding against false positives on other 4-digit numbers like 1429. +_RATE_LIMIT_MARKERS = ( + "status code: 429", + "error code: 429", + " 429", + "429 ", + "ratelimiterror", + "rate limited", + "rate-limited", + "rate limit", + "too many requests", +) + async def _drive_agent_with_resume( *, @@ -9724,11 +9744,22 @@ async def _drive_agent_with_resume( async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, base_delay: float = 1.5, + rate_limit_base_delay: float = 7.5, config: dict | None = None): """Wrap a LangGraph agent invocation with retry on transient cloud errors. - Retries on common Ollama Cloud / streaming hiccups (500, status -1, etc.). - Non-transient exceptions (4xx, validation, etc.) propagate immediately. + Two backoff regimes: + + * 5xx / connection-reset / streaming hiccups → ``base_delay`` + (1.5s/3s/4.5s for 3 attempts) — these usually clear within a + few seconds. + * 429 rate-limit responses → ``rate_limit_base_delay`` (7.5s/15s + /22.5s) — free / shared upstream tiers (e.g. OpenRouter + ``…:free`` models) throttle on short windows that need + 30-60s to clear. + + Non-transient exceptions (4xx other than 429, validation, schema + drift, etc.) propagate immediately. """ last_exc: Exception | None = None for attempt in range(max_attempts): @@ -9758,11 +9789,13 @@ async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, raise except Exception as exc: # noqa: BLE001 msg = str(exc).lower() - transient = any(m in msg for m in _TRANSIENT_MARKERS) - if not transient or attempt == max_attempts - 1: + is_5xx = any(m in msg for m in _TRANSIENT_MARKERS) + is_429 = any(m in msg for m in _RATE_LIMIT_MARKERS) + if not (is_5xx or is_429) or attempt == max_attempts - 1: raise last_exc = exc - await asyncio.sleep(base_delay * (attempt + 1)) + delay = (rate_limit_base_delay if is_429 else base_delay) + await asyncio.sleep(delay * (attempt + 1)) raise last_exc or RuntimeError("retry exhausted with no attempts") # pragma: no cover diff --git a/dist/apps/incident-management.py b/dist/apps/incident-management.py index 53e0b12..083c713 100644 --- a/dist/apps/incident-management.py +++ b/dist/apps/incident-management.py @@ -9638,6 +9638,26 @@ def finish(self, *, summary: str) -> None: "connection reset", ) +# 429 markers are kept separate from ``_TRANSIENT_MARKERS`` so the +# retry loop can apply a longer backoff. Free / shared upstream models +# (e.g. OpenRouter ``…:free`` tier) throttle on short windows that +# need 30-60s recovery — the 5xx default backoff (1.5s/3s/4.5s) is +# too aggressive and exhausts retries before the window clears. +# ``" 429"`` (with leading space) and ``"429 "`` cover bare-number +# error strings (e.g. "Provider returned 429"), with the spaces +# guarding against false positives on other 4-digit numbers like 1429. +_RATE_LIMIT_MARKERS = ( + "status code: 429", + "error code: 429", + " 429", + "429 ", + "ratelimiterror", + "rate limited", + "rate-limited", + "rate limit", + "too many requests", +) + async def _drive_agent_with_resume( *, @@ -9736,11 +9756,22 @@ async def _drive_agent_with_resume( async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, base_delay: float = 1.5, + rate_limit_base_delay: float = 7.5, config: dict | None = None): """Wrap a LangGraph agent invocation with retry on transient cloud errors. - Retries on common Ollama Cloud / streaming hiccups (500, status -1, etc.). - Non-transient exceptions (4xx, validation, etc.) propagate immediately. + Two backoff regimes: + + * 5xx / connection-reset / streaming hiccups → ``base_delay`` + (1.5s/3s/4.5s for 3 attempts) — these usually clear within a + few seconds. + * 429 rate-limit responses → ``rate_limit_base_delay`` (7.5s/15s + /22.5s) — free / shared upstream tiers (e.g. OpenRouter + ``…:free`` models) throttle on short windows that need + 30-60s to clear. + + Non-transient exceptions (4xx other than 429, validation, schema + drift, etc.) propagate immediately. """ last_exc: Exception | None = None for attempt in range(max_attempts): @@ -9770,11 +9801,13 @@ async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, raise except Exception as exc: # noqa: BLE001 msg = str(exc).lower() - transient = any(m in msg for m in _TRANSIENT_MARKERS) - if not transient or attempt == max_attempts - 1: + is_5xx = any(m in msg for m in _TRANSIENT_MARKERS) + is_429 = any(m in msg for m in _RATE_LIMIT_MARKERS) + if not (is_5xx or is_429) or attempt == max_attempts - 1: raise last_exc = exc - await asyncio.sleep(base_delay * (attempt + 1)) + delay = (rate_limit_base_delay if is_429 else base_delay) + await asyncio.sleep(delay * (attempt + 1)) raise last_exc or RuntimeError("retry exhausted with no attempts") # pragma: no cover diff --git a/src/runtime/graph.py b/src/runtime/graph.py index 65f5550..7d15e7c 100644 --- a/src/runtime/graph.py +++ b/src/runtime/graph.py @@ -198,6 +198,26 @@ def finish(self, *, summary: str) -> None: "connection reset", ) +# 429 markers are kept separate from ``_TRANSIENT_MARKERS`` so the +# retry loop can apply a longer backoff. Free / shared upstream models +# (e.g. OpenRouter ``…:free`` tier) throttle on short windows that +# need 30-60s recovery — the 5xx default backoff (1.5s/3s/4.5s) is +# too aggressive and exhausts retries before the window clears. +# ``" 429"`` (with leading space) and ``"429 "`` cover bare-number +# error strings (e.g. "Provider returned 429"), with the spaces +# guarding against false positives on other 4-digit numbers like 1429. +_RATE_LIMIT_MARKERS = ( + "status code: 429", + "error code: 429", + " 429", + "429 ", + "ratelimiterror", + "rate limited", + "rate-limited", + "rate limit", + "too many requests", +) + async def _drive_agent_with_resume( *, @@ -296,11 +316,22 @@ async def _drive_agent_with_resume( async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, base_delay: float = 1.5, + rate_limit_base_delay: float = 7.5, config: dict | None = None): """Wrap a LangGraph agent invocation with retry on transient cloud errors. - Retries on common Ollama Cloud / streaming hiccups (500, status -1, etc.). - Non-transient exceptions (4xx, validation, etc.) propagate immediately. + Two backoff regimes: + + * 5xx / connection-reset / streaming hiccups → ``base_delay`` + (1.5s/3s/4.5s for 3 attempts) — these usually clear within a + few seconds. + * 429 rate-limit responses → ``rate_limit_base_delay`` (7.5s/15s + /22.5s) — free / shared upstream tiers (e.g. OpenRouter + ``…:free`` models) throttle on short windows that need + 30-60s to clear. + + Non-transient exceptions (4xx other than 429, validation, schema + drift, etc.) propagate immediately. """ last_exc: Exception | None = None for attempt in range(max_attempts): @@ -330,11 +361,13 @@ async def _ainvoke_with_retry(executor, input_, *, max_attempts: int = 3, raise except Exception as exc: # noqa: BLE001 msg = str(exc).lower() - transient = any(m in msg for m in _TRANSIENT_MARKERS) - if not transient or attempt == max_attempts - 1: + is_5xx = any(m in msg for m in _TRANSIENT_MARKERS) + is_429 = any(m in msg for m in _RATE_LIMIT_MARKERS) + if not (is_5xx or is_429) or attempt == max_attempts - 1: raise last_exc = exc - await asyncio.sleep(base_delay * (attempt + 1)) + delay = (rate_limit_base_delay if is_429 else base_delay) + await asyncio.sleep(delay * (attempt + 1)) raise last_exc or RuntimeError("retry exhausted with no attempts") # pragma: no cover diff --git a/tests/test_ainvoke_retry_429.py b/tests/test_ainvoke_retry_429.py new file mode 100644 index 0000000..155482e --- /dev/null +++ b/tests/test_ainvoke_retry_429.py @@ -0,0 +1,153 @@ +"""Pin the two backoff regimes in ``runtime.graph._ainvoke_with_retry``. + +* 5xx / streaming hiccups retry on a short-window backoff (``base_delay``). +* 429 rate-limit responses retry on a longer-window backoff + (``rate_limit_base_delay``) — free / shared upstream tiers (e.g. + OpenRouter ``…:free`` models) throttle on windows that need + 30-60s to clear; the 5xx default exhausts retries before the + window opens again. + +A single non-transient error (e.g. 401 / 422) propagates immediately +without retry so quota / schema / auth issues fail fast. +""" +from __future__ import annotations + +import pytest + +from runtime.graph import _ainvoke_with_retry + + +class _RecordingExecutor: + """A fake agent executor whose ``ainvoke`` raises a configurable + sequence of exceptions; the final entry can be a return value.""" + + def __init__(self, sequence): + self._sequence = list(sequence) + self.calls = 0 + + async def ainvoke(self, _input, **_kwargs): + self.calls += 1 + item = self._sequence.pop(0) + if isinstance(item, BaseException): + raise item + return item + + +@pytest.mark.asyncio +async def test_retries_on_5xx_and_returns_eventually(monkeypatch): + """5xx burst clears within the short-window backoff.""" + sleeps: list[float] = [] + + async def _fake_sleep(s): + sleeps.append(s) + + monkeypatch.setattr("runtime.graph.asyncio.sleep", _fake_sleep) + + exec_ = _RecordingExecutor([ + RuntimeError("Internal server error 500 from upstream"), + {"messages": []}, + ]) + result = await _ainvoke_with_retry(exec_, {"messages": []}) + assert result == {"messages": []} + assert exec_.calls == 2 + assert sleeps == [1.5] # base_delay * (attempt+1) on the first retry + + +@pytest.mark.asyncio +async def test_retries_on_429_with_longer_backoff(monkeypatch): + """429 retries fire on the rate_limit_base_delay (7.5s+) window + instead of the 1.5s default — free upstream tiers need 30-60s + to clear and the 5xx default would exhaust 3 attempts in 9s. + """ + sleeps: list[float] = [] + + async def _fake_sleep(s): + sleeps.append(s) + + monkeypatch.setattr("runtime.graph.asyncio.sleep", _fake_sleep) + + exec_ = _RecordingExecutor([ + RuntimeError("Error code: 429 - rate limit exceeded"), + RuntimeError("Error code: 429 - still rate-limited"), + {"messages": []}, + ]) + result = await _ainvoke_with_retry(exec_, {"messages": []}) + assert result == {"messages": []} + assert exec_.calls == 3 + # 7.5s on attempt 1, 15s on attempt 2 — confirms the rate-limit + # branch fired with the longer backoff per attempt. + assert sleeps == [7.5, 15.0] + + +@pytest.mark.asyncio +async def test_429_phrasings_all_match(monkeypatch): + """Markers cover the variants real providers emit. Each phrase + on its own should hit the 429 retry branch (assertable via the + longer backoff).""" + monkeypatch.setattr( + "runtime.graph.asyncio.sleep", + lambda _s: _noop(), # type: ignore[arg-type] + ) + + async def _noop(): + return None + + for phrase in ( + "RateLimitError: too many requests", + "Provider returned 429", + "Status code: 429", + "rate limited upstream", + "rate-limited", + ): + exec_ = _RecordingExecutor([ + RuntimeError(phrase), + {"messages": []}, + ]) + result = await _ainvoke_with_retry(exec_, {"messages": []}) + assert result == {"messages": []}, f"failed to retry on {phrase!r}" + assert exec_.calls == 2, f"unexpected call count for {phrase!r}" + + +@pytest.mark.asyncio +async def test_non_transient_error_propagates_without_retry(monkeypatch): + """4xx (other than 429) and validation errors must fail fast — + retrying a 401/422 wastes time and masks the real problem.""" + sleeps: list[float] = [] + + async def _fake_sleep(s): + sleeps.append(s) + + monkeypatch.setattr("runtime.graph.asyncio.sleep", _fake_sleep) + + exec_ = _RecordingExecutor([ + RuntimeError("Error code: 401 - unauthorized"), + ]) + with pytest.raises(RuntimeError, match="401"): + await _ainvoke_with_retry(exec_, {"messages": []}) + assert exec_.calls == 1 + assert sleeps == [] # no retry → no sleep + + +@pytest.mark.asyncio +async def test_429_exhausts_max_attempts_then_raises(monkeypatch): + """If the rate-limit window doesn't clear within max_attempts + retries, the last 429 propagates — bounded so we don't loop + forever on a real quota exhaustion.""" + sleeps: list[float] = [] + + async def _fake_sleep(s): + sleeps.append(s) + + monkeypatch.setattr("runtime.graph.asyncio.sleep", _fake_sleep) + + exec_ = _RecordingExecutor([ + RuntimeError("Error code: 429 - rate limit"), + RuntimeError("Error code: 429 - rate limit"), + RuntimeError("Error code: 429 - rate limit"), + ]) + with pytest.raises(RuntimeError, match="429"): + await _ainvoke_with_retry(exec_, {"messages": []}) + assert exec_.calls == 3 + # Only TWO sleeps: backoff happens BEFORE retries 2 and 3, and + # the loop refuses to sleep before raising on the final attempt. + assert sleeps == [7.5, 15.0] diff --git a/tests/test_integration_driver_s1.py b/tests/test_integration_driver_s1.py index 65445ce..8f977c3 100644 --- a/tests/test_integration_driver_s1.py +++ b/tests/test_integration_driver_s1.py @@ -50,17 +50,9 @@ _OPENROUTER_KEY = os.environ.get("OPENROUTER_API_KEY") _OLLAMA_KEY = os.environ.get("OLLAMA_API_KEY") _OLLAMA_BASE_URL = os.environ.get("OLLAMA_BASE_URL") - - -pytestmark = pytest.mark.skipif( - not (_OPENROUTER_KEY and _OLLAMA_KEY and _OLLAMA_BASE_URL), - reason=( - "Phase 15 integration driver S1 requires live LLM access. " - "Set OPENROUTER_API_KEY + OLLAMA_API_KEY + OLLAMA_BASE_URL to " - "exercise. See .planning/phases/15-real-llm-tool-loop-termination/" - "15-VERIFICATION.md for the manual run procedure." - ), -) +_AZURE_KEY = os.environ.get("AZURE_OPENAI_KEY") +_AZURE_ENDPOINT = os.environ.get("AZURE_ENDPOINT") +_AZURE_DEPLOYMENT = os.environ.get("AZURE_DEPLOYMENT", "gpt-4o") def _make_repo(tmp_path: Path) -> SessionStore: @@ -74,40 +66,89 @@ def _make_repo(tmp_path: Path) -> SessionStore: def _build_llm_cfg() -> LLMConfig: - """Two providers + two named models — what ``get_llm`` consumes.""" - return LLMConfig( - default="workhorse", - providers={ - "openrouter": ProviderConfig( - kind="openai_compat", - base_url="https://openrouter.ai/api/v1", - api_key=_OPENROUTER_KEY, - ), - "ollama": ProviderConfig( - kind="ollama", - base_url=_OLLAMA_BASE_URL, - api_key=_OLLAMA_KEY, - ), - }, - models={ - "workhorse": ModelConfig( - provider="openrouter", model="openai/gpt-4o-mini", - ), - "local": ModelConfig(provider="ollama", model="gpt-oss:20b"), - }, - ) + """Three providers + three named models — what ``get_llm`` consumes. + + OpenRouter, Ollama Cloud, and Azure OpenAI are each declared. The + parametrize arms below skip per-leg if the corresponding key is + missing, so a partial-key environment exercises whichever + providers it can reach. + """ + providers: dict = {} + models: dict = {} + if _OPENROUTER_KEY: + providers["openrouter"] = ProviderConfig( + kind="openai_compat", + base_url="https://openrouter.ai/api/v1", + api_key=_OPENROUTER_KEY, + ) + models["workhorse"] = ModelConfig( + provider="openrouter", model="openai/gpt-4o-mini", + ) + if _OLLAMA_KEY and _OLLAMA_BASE_URL: + providers["ollama"] = ProviderConfig( + kind="ollama", + base_url=_OLLAMA_BASE_URL, + api_key=_OLLAMA_KEY, + ) + models["local"] = ModelConfig(provider="ollama", model="gpt-oss:20b") + if _AZURE_KEY and _AZURE_ENDPOINT: + providers["azure"] = ProviderConfig( + kind="azure_openai", + endpoint=_AZURE_ENDPOINT, + api_version="2024-08-01-preview", + api_key=_AZURE_KEY, + ) + models["azure"] = ModelConfig( + provider="azure", model="gpt-4o", deployment=_AZURE_DEPLOYMENT, + ) + if not models: + # Fallback so the LLMConfig validator (which requires the + # default to exist in models) passes when the test is being + # collected but every provider is keyless. The actual test + # invocations are then skipped per-arm. + from runtime.config import ProviderConfig as _PC, ModelConfig as _MC + providers["stub"] = _PC(kind="stub") + models["stub_default"] = _MC(provider="stub", model="stub-1") + default = "stub_default" + else: + # Pick whichever model is available, in priority order. + default = next( + (m for m in ("local", "workhorse", "azure") if m in models), + next(iter(models)), + ) + return LLMConfig(default=default, providers=providers, models=models) + + +_PER_MODEL_SKIP_REASON = { + "workhorse": ( + "OPENROUTER_API_KEY not set — set it to exercise the OpenAI-compatible leg", + lambda: not _OPENROUTER_KEY, + ), + "local": ( + "OLLAMA_API_KEY + OLLAMA_BASE_URL not set — set both to exercise the Ollama leg", + lambda: not (_OLLAMA_KEY and _OLLAMA_BASE_URL), + ), + "azure": ( + "AZURE_OPENAI_KEY + AZURE_ENDPOINT not set — set both to exercise the Azure leg", + lambda: not (_AZURE_KEY and _AZURE_ENDPOINT), + ), +} @pytest.mark.asyncio -@pytest.mark.parametrize("model_name", ["workhorse", "local"]) +@pytest.mark.parametrize("model_name", ["workhorse", "local", "azure"]) async def test_integration_driver_s1_terminal_state(tmp_path, model_name): - """S1: agent_node reaches a terminal state across providers. + """S1: agent_node reaches a terminal state across live providers. - This is the live-LLM analogue of the stub-mode termination tests. - A failure here means the migration regressed for at least one - provider; rerun with ``--log-cli-level=DEBUG`` to capture the - full message sequence for diagnosis. + Parametrize arms (``workhorse`` / ``local`` / ``azure``) cover the + three production provider kinds. Each arm skips independently if + its keys are absent — partial-key environments still exercise + whichever providers they can reach. """ + skip_reason, predicate = _PER_MODEL_SKIP_REASON[model_name] + if predicate(): + pytest.skip(skip_reason) + cfg = _build_llm_cfg() llm = get_llm(cfg, model_name) @@ -122,9 +163,27 @@ async def test_integration_driver_s1_terminal_state(tmp_path, model_name): name="responder", description="Brief responder skill for integration test.", routes=[RouteRule(when="default", next="__end__")], + # Phase 22 contract: every reply MUST end with the markdown + # envelope so the framework's parse_envelope_from_result Path 4 + # can lift confidence + signal. Without this prompt the live + # LLM emits plain prose, hits no tool / no envelope, and Path + # 7 (terminal) raises EnvelopeMissingError. Mirror of the + # contract documented in + # ``examples/incident_management/skills/*/system.md``. system_prompt=( "You are a concise assistant. Respond to the user's prompt " - "in one sentence. Do not invoke any tools." + "in one sentence. Do not invoke any tools.\n\n" + "## Output contract — REQUIRED\n" + "Every final reply MUST end with these three sections, in" + " order, each preceded by a level-2 markdown header:\n" + " ## Response\n" + " \n" + " ## Confidence\n" + " <0.0-1.0 float> -- \n" + " ## Signal\n" + " none\n" + "**CRITICAL — final-reply rule:** the markdown envelope is" + " mandatory; the framework hard-fails if it is missing." ), ) node = make_agent_node(