Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions tests/test_envelope_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Coverage tests for ``runtime.graph._try_recover_envelope_from_raw`` (graph.py:583-610).

The recovery helper is invoked by the agent runner when LangGraph's
structured-output pass raises ``OutputParserException`` — it tries
several candidate substrings to dig an :class:`AgentTurnOutput` out of
free-form LLM text. The function is pure and behaves identically for
every input, so a table-driven test suite pins each branch.
"""
from __future__ import annotations

import json

import pytest

from runtime.agents.turn_output import AgentTurnOutput
from runtime.graph import _try_recover_envelope_from_raw


def _envelope_dict(*, content: str = "ok", confidence: float = 0.85,
rationale: str = "stub", signal: str | None = None) -> dict:
return {
"content": content,
"confidence": confidence,
"confidence_rationale": rationale,
"signal": signal,
}


def _envelope_json(**overrides) -> str:
return json.dumps(_envelope_dict(**overrides))


class TestEmptyInput:
@pytest.mark.parametrize("raw", ["", " ", "\n\n \t\n"])
def test_empty_or_whitespace_returns_none(self, raw):
assert _try_recover_envelope_from_raw(raw) is None


class TestPlainJsonInput:
def test_valid_envelope_json_parses(self):
out = _try_recover_envelope_from_raw(_envelope_json(content="hello"))
assert isinstance(out, AgentTurnOutput)
assert out.content == "hello"

def test_valid_envelope_with_signal(self):
out = _try_recover_envelope_from_raw(_envelope_json(signal="reconcile"))
assert out is not None
assert out.signal == "reconcile"


class TestMarkdownFencedJson:
def test_fenced_with_json_tag(self):
raw = f"```json\n{_envelope_json()}\n```"
out = _try_recover_envelope_from_raw(raw)
assert isinstance(out, AgentTurnOutput)

def test_fenced_without_json_tag(self):
raw = f"```\n{_envelope_json(confidence=0.42)}\n```"
out = _try_recover_envelope_from_raw(raw)
assert out is not None
assert out.confidence == 0.42

def test_fenced_with_surrounding_chatter(self):
raw = (
"Here is my structured response:\n\n"
f"```json\n{_envelope_json(content='fenced')}\n```\n\n"
"Hope that helps!"
)
out = _try_recover_envelope_from_raw(raw)
assert out is not None
assert out.content == "fenced"


class TestGreedyBraceMatch:
def test_chatter_then_json_then_chatter(self):
# No fences — should fall through to the greedy first-{...-last-} scan.
raw = (
f"Sure, here's the answer: {_envelope_json(content='greedy')} "
"Let me know if you need more!"
)
out = _try_recover_envelope_from_raw(raw)
assert out is not None
assert out.content == "greedy"


class TestUnrecoverableInput:
def test_invalid_json_returns_none(self):
assert _try_recover_envelope_from_raw("{not valid json}") is None

def test_no_braces_returns_none(self):
assert _try_recover_envelope_from_raw("Just a plain sentence.") is None

def test_json_array_not_dict_returns_none(self):
# Greedy match would still find a substring, but `[1, 2, 3]`
# has no braces. Use a real array text.
assert _try_recover_envelope_from_raw('["a", "b"]') is None

def test_valid_dict_missing_required_fields_returns_none(self):
# `{"foo": "bar"}` parses but fails AgentTurnOutput validation.
assert _try_recover_envelope_from_raw('{"foo": "bar"}') is None

def test_dict_with_invalid_field_types_returns_none(self):
# confidence must be 0..1; this should fail validation on every candidate.
assert _try_recover_envelope_from_raw(
'{"content": "x", "confidence": 5.0, "confidence_rationale": "y"}'
) is None
145 changes: 145 additions & 0 deletions tests/test_handle_agent_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Coverage tests for ``runtime.graph._handle_agent_failure`` (graph.py:613-644).

This helper is invoked by the agent runner when an agent body raises a
non-pause exception (anything other than ``GraphInterrupt``). It reloads
the session (absorbing partial tool writes), appends a failure
``AgentRun``, marks the session ``status='error'``, persists, and returns
the state dict the LangGraph node yields.
"""
from __future__ import annotations

import pytest

from runtime.config import EmbeddingConfig, MetadataConfig, ProviderConfig
from runtime.graph import _handle_agent_failure
from runtime.state import AgentRun, Session
from runtime.storage.embeddings import build_embedder
from runtime.storage.engine import build_engine
from runtime.storage.models import Base
from runtime.storage.session_store import SessionStore


@pytest.fixture
def store(tmp_path) -> SessionStore:
eng = build_engine(MetadataConfig(url=f"sqlite:///{tmp_path}/test.db"))
Base.metadata.create_all(eng)
embedder = build_embedder(
EmbeddingConfig(provider="s", model="x", dim=1024),
{"s": ProviderConfig(kind="stub")},
)
return SessionStore(engine=eng, embedder=embedder)


def _seed_session(store: SessionStore, *, agents_run: list[AgentRun] | None = None) -> Session:
"""Create + persist a baseline session and return it."""
inc = store.create(query="probe", environment="dev",
reporter_id="u1", reporter_team="t")
if agents_run:
inc.agents_run.extend(agents_run)
store.save(inc)
inc = store.load(inc.id)
return inc


class TestHappyPath:
def test_failure_run_appended_and_status_set_to_error(self, store):
inc = _seed_session(store)
result = _handle_agent_failure(
skill_name="triage",
started_at="2026-05-15T00:00:00Z",
exc=RuntimeError("upstream blew up"),
inc_id=inc.id,
store=store,
fallback=inc,
)
# Returned state dict
assert result["last_agent"] == "triage"
assert result["next_route"] is None
assert result["error"] == "upstream blew up"
assert isinstance(result["session"], Session)
# Persisted session reflects the failure
loaded = store.load(inc.id)
assert loaded.status == "error"
assert len(loaded.agents_run) == 1
run = loaded.agents_run[0]
assert run.agent == "triage"
assert run.summary == "agent failed: upstream blew up"

def test_appends_to_existing_run_history(self, store):
prior = AgentRun(
agent="intake",
started_at="2026-05-15T00:00:00Z",
ended_at="2026-05-15T00:00:01Z",
summary="completed: routed to triage",
)
inc = _seed_session(store, agents_run=[prior])
_handle_agent_failure(
skill_name="triage",
started_at="2026-05-15T00:00:02Z",
exc=TimeoutError("provider hung"),
inc_id=inc.id,
store=store,
fallback=inc,
)
loaded = store.load(inc.id)
assert [r.agent for r in loaded.agents_run] == ["intake", "triage"]
assert "agent failed: provider hung" in loaded.agents_run[1].summary

def test_preserves_partial_tool_writes_via_reload(self, store):
"""If a tool wrote to the session before the agent raised,
the reload-then-append pattern must keep that tool's write."""
inc = _seed_session(store)
# Simulate a tool write that already persisted.
from runtime.state import ToolCall
inc.tool_calls.append(ToolCall(
agent="triage",
tool="lookup_similar_incidents",
args={"query": "x"},
result={"hits": []},
ts="2026-05-15T00:00:00Z",
))
store.save(inc)
# Caller's stale `fallback` reference does not have the tool call.
stale = inc.model_copy(deep=True)
stale.tool_calls = []
_handle_agent_failure(
skill_name="triage",
started_at="2026-05-15T00:00:02Z",
exc=RuntimeError("oops"),
inc_id=inc.id,
store=store,
fallback=stale,
)
loaded = store.load(inc.id)
# Tool call survived because _handle_agent_failure reloaded
# before appending its failure run.
assert len(loaded.tool_calls) == 1
assert loaded.tool_calls[0].tool == "lookup_similar_incidents"


class TestFallbackPath:
def test_uses_fallback_when_session_missing_from_store(self, store):
# Session never persisted; store.load(inc_id) raises FileNotFoundError.
from runtime.state import Session
ghost = Session(
id="INC-20260515-999",
status="in_progress",
created_at="2026-05-15T00:00:00Z",
updated_at="2026-05-15T00:00:00Z",
)
result = _handle_agent_failure(
skill_name="intake",
started_at="2026-05-15T00:00:00Z",
exc=RuntimeError("dropped on the floor"),
inc_id="INC-20260515-999",
store=store,
fallback=ghost,
)
# The fallback was used, the failure run was appended,
# and the now-populated fallback was saved.
assert result["session"].status == "error"
loaded = store.load("INC-20260515-999")
assert loaded.id == "INC-20260515-999"
assert loaded.status == "error"
assert len(loaded.agents_run) == 1
assert loaded.agents_run[0].agent == "intake"
72 changes: 72 additions & 0 deletions tests/test_llm_stub_structured_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Coverage tests for ``StubChatModel.with_structured_output`` (llm.py:141-160, 171-177).

The structured-output runnable was previously only exercised indirectly
via ``langchain.agents.create_agent``. These tests pin the direct
contract: the stub returns a Runnable-like that yields a valid schema
instance per ``invoke`` / ``ainvoke``, populated from the canned text and
``stub_envelope_*`` parameters.

The permissive ``model_validate`` fallback (lines 161-169) is genuinely
defensive: pydantic v2's ``model_validate`` internally calls ``__init__``
too, so any schema whose constructor raises also fails the fallback.
The fallback exists for hypothetical schemas with custom
``__pydantic_validator__`` overrides, which the framework doesn't ship
and tests can't construct without monkey-patching pydantic internals.
"""
from __future__ import annotations

import pytest

from runtime.agents.turn_output import AgentTurnOutput
from runtime.llm import StubChatModel


def _stub(*, confidence: float = 0.85, rationale: str = "stub rationale",
signal: str | None = None, role: str = "intake",
canned: dict[str, str] | None = None) -> StubChatModel:
return StubChatModel(
role=role,
canned_responses=canned if canned is not None else {role: "stub body text"},
stub_envelope_confidence=confidence,
stub_envelope_rationale=rationale,
stub_envelope_signal=signal,
)


class TestStubStructuredOutputHappyPath:
"""Happy path: schema(...) keyword constructor succeeds."""

def test_invoke_returns_schema_instance(self):
runnable = _stub().with_structured_output(AgentTurnOutput)
out = runnable.invoke("any input")
assert isinstance(out, AgentTurnOutput)
assert out.content == "stub body text"
assert out.confidence == 0.85
assert out.confidence_rationale == "stub rationale"
assert out.signal is None

@pytest.mark.asyncio
async def test_ainvoke_returns_schema_instance(self):
runnable = _stub(confidence=0.42, rationale="hedge", signal="retry").with_structured_output(AgentTurnOutput)
out = await runnable.ainvoke("any input")
assert isinstance(out, AgentTurnOutput)
assert out.confidence == 0.42
assert out.confidence_rationale == "hedge"
assert out.signal == "retry"

def test_canned_response_missing_uses_default_marker(self):
runnable = _stub(role="ghost", canned={}).with_structured_output(AgentTurnOutput)
out = runnable.invoke("x")
assert out.content.startswith("[stub:ghost]")

def test_include_raw_kwarg_is_accepted(self):
# langchain passes include_raw=True/False on the call site; the stub
# accepts the kwarg but doesn't change behaviour.
runnable = _stub().with_structured_output(AgentTurnOutput, include_raw=True)
assert runnable.invoke("x").content == "stub body text"

def test_extra_kwargs_are_swallowed(self):
runnable = _stub().with_structured_output(AgentTurnOutput, method="json_mode", strict=True)
assert runnable.invoke("x").confidence == 0.85


Loading