Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 56 additions & 62 deletions dist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,11 @@ class IncidentState(Session):

# Phase 11 (FOC-04): forward-reference imports for the should_gate
# signature only; kept inside ``TYPE_CHECKING`` so the bundle's
# intra-import stripper does not remove a load-bearing import. The
# ``pass`` keeps the block syntactically valid after stripping.
# intra-import stripper sees them. The bundler's
# ``_ORPHANED_TYPE_CHECKING_RE`` rewrite injects a ``pass`` body when
# the imports get stripped (build_single_file.py:292) — but its regex
# requires the ``if TYPE_CHECKING:`` line to have no trailing comment,
# so do not add one here.
# ----- imports for runtime/agents/responsive.py -----
"""Responsive agent kind — the today-default LLM agent.

Expand Down Expand Up @@ -1397,6 +1400,9 @@ async def _poll(self, registry):



# Forward-import: ``runtime.triggers.base`` only defines a dataclass and
# the type appears in a method annotation. Kept inside ``TYPE_CHECKING``
# to avoid a runtime circular import.
# ----- imports for runtime/api.py -----
"""FastAPI app — health, listings, incident, and multi-session endpoints.

Expand Down Expand Up @@ -3590,6 +3596,12 @@ class Base(DeclarativeBase):
pass


# SQL fragment used as the partial-index predicate so soft-deleted
# rows don't bloat the indexes (mirrors the application-layer
# "exclude deleted" filter in SessionStore queries).
_ACTIVE_ROW_SQL = "deleted_at IS NULL"


class IncidentRow(Base):
__tablename__ = "incidents"

Expand Down Expand Up @@ -3641,11 +3653,11 @@ class IncidentRow(Base):

__table_args__ = (
Index("ix_incidents_status_env_active", "status", "environment",
postgresql_where=text("deleted_at IS NULL"),
sqlite_where=text("deleted_at IS NULL")),
postgresql_where=text(_ACTIVE_ROW_SQL),
sqlite_where=text(_ACTIVE_ROW_SQL)),
Index("ix_incidents_created_at_active", "created_at",
postgresql_where=text("deleted_at IS NULL"),
sqlite_where=text("deleted_at IS NULL")),
postgresql_where=text(_ACTIVE_ROW_SQL),
sqlite_where=text(_ACTIVE_ROW_SQL)),
Index("ix_incidents_parent_session_id", "parent_session_id"),
)

Expand Down Expand Up @@ -6799,7 +6811,7 @@ def parse_envelope_from_result(
continue
try:
payload = json.loads(content)
except (json.JSONDecodeError, ValueError):
except ValueError: # JSONDecodeError is a ValueError subclass
continue
if not isinstance(payload, dict):
continue
Expand Down Expand Up @@ -8265,12 +8277,8 @@ async def _resume_with_timeout(

# ====== module: runtime/policy.py ======

if TYPE_CHECKING: # pragma: no cover -- type checking only


pass # noqa: PIE790 -- bundle survives even if imports are stripped


if TYPE_CHECKING:
pass
GateReason = Literal[
"auto",
"high_risk_tool",
Expand Down Expand Up @@ -9974,7 +9982,7 @@ def _try_recover_envelope_from_raw(raw: str) -> AgentTurnOutput | None:
for candidate in candidates:
try:
payload = json.loads(candidate)
except (json.JSONDecodeError, ValueError):
except ValueError: # JSONDecodeError is a ValueError subclass
continue
if not isinstance(payload, dict):
continue
Expand Down Expand Up @@ -10997,9 +11005,6 @@ async def make_checkpointer(

if TYPE_CHECKING:
pass



@dataclass(frozen=True)
class TriggerInfo:
"""Provenance attached to every session started via a trigger.
Expand Down Expand Up @@ -11469,9 +11474,6 @@ async def stop(self) -> None:

if TYPE_CHECKING:
pass



_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -11564,7 +11566,7 @@ async def handler(
)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except (ValueError, TypeError, ValidationError) as exc:
except (ValueError, TypeError) as exc: # pydantic ValidationError is a ValueError subclass
_log.warning(
"trigger %r transform/dispatch failed: %s", name, exc
)
Expand All @@ -11577,8 +11579,6 @@ async def handler(

if TYPE_CHECKING:
pass


_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -13540,15 +13540,7 @@ def gc_orphaned_checkpoints(engine: Engine) -> int:
# ====== module: runtime/orchestrator.py ======

if TYPE_CHECKING:
# Avoid a runtime circular import — ``runtime.triggers.base`` only
# defines a dataclass, and the type appears in a method annotation.
pass






from langgraph.errors import GraphInterrupt
from langgraph.types import Command

Expand All @@ -13567,6 +13559,12 @@ def gc_orphaned_checkpoints(engine: Engine) -> int:
_log = logging.getLogger("runtime.orchestrator")


# Marker that ``runtime.graph._handle_agent_failure`` writes onto the
# AgentRun.summary of the failing turn. Read by the retry / extract-error
# helpers below.
_AGENT_FAILURE_MARKER = "agent failed:"


def _assert_envelope_invariant_on_finalize(session: "Session") -> None:
"""Phase 10 (FOC-03) defence-in-depth log sweep.

Expand Down Expand Up @@ -14484,9 +14482,9 @@ def _extract_last_error(inc: "Session") -> Exception | None:
import pydantic as _pydantic
for run in reversed(inc.agents_run):
summary = (run.summary or "")
if not summary.startswith("agent failed:"):
if not summary.startswith(_AGENT_FAILURE_MARKER):
continue
body = summary.removeprefix("agent failed:").strip()
body = summary.removeprefix(_AGENT_FAILURE_MARKER).strip()
if "EnvelopeMissingError" in body:
return _EnvelopeMissingError(
agent=run.agent or "unknown",
Expand Down Expand Up @@ -15064,7 +15062,7 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]:
# successful runs. Retry attempts then append fresh runs.
inc.agents_run = [
r for r in inc.agents_run
if not (r.summary or "").startswith("agent failed:")
if not (r.summary or "").startswith(_AGENT_FAILURE_MARKER)
]
# Bump retry counter for unique LangGraph thread id (the prior
# thread's checkpoint sits at a terminal node and would
Expand Down Expand Up @@ -15207,6 +15205,13 @@ def _event_ts() -> str:
_log = logging.getLogger("runtime.api")


# Wire-format constants (extracted to keep S1192 — duplicated literal
# strings — in check; every SSE endpoint uses _SSE_MEDIA_TYPE, every
# session-not-found path raises with _SESSION_NOT_FOUND_DETAIL).
_SSE_MEDIA_TYPE = "text/event-stream"
_SESSION_NOT_FOUND_DETAIL = "session not found"


# HTTP status -> structured error code. Used by the global exception
# handler to keep React's error UI from having to switch on every
# integer status code.
Expand Down Expand Up @@ -15619,7 +15624,7 @@ async def _events():
):
yield f"data: {json.dumps(ev, default=str)}\n\n"

return StreamingResponse(_events(), media_type="text/event-stream")
return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE)

@fastapi_app.post("/incidents/{incident_id}/resume")
async def resume_incident(incident_id: str, req: ResumeRequest) -> StreamingResponse:
Expand Down Expand Up @@ -15647,15 +15652,14 @@ async def _events():
}
yield f"data: {json.dumps(err, default=str)}\n\n"

return StreamingResponse(_events(), media_type="text/event-stream")
return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE)

# ------------------------------------------------------------------
# Multi-session endpoints
# ------------------------------------------------------------------

@fastapi_app.post(
"/sessions",
response_model=SessionStartResponse,
status_code=201,
)
async def start_session_endpoint(
Expand Down Expand Up @@ -15685,7 +15689,7 @@ class is matched by name so this handler does not depend on a
raise
return SessionStartResponse(session_id=sid)

@fastapi_app.get("/sessions", response_model=list[SessionStatus])
@fastapi_app.get("/sessions")
async def list_sessions_endpoint(request: Request) -> list[SessionStatus]:
"""Snapshot of in-flight sessions (running / awaiting_input / error)."""
svc = request.app.state.service
Expand All @@ -15695,10 +15699,7 @@ async def list_sessions_endpoint(request: Request) -> list[SessionStatus]:
# HITL approval endpoints (risk-rated tool gateway)
# ------------------------------------------------------------------

@fastapi_app.get(
"/sessions/{session_id}/approvals",
response_model=list[PendingApproval],
)
@fastapi_app.get("/sessions/{session_id}/approvals")
async def list_pending_approvals(
session_id: str, request: Request
) -> list[PendingApproval]:
Expand All @@ -15713,13 +15714,13 @@ async def list_pending_approvals(
orch = request.app.state.orchestrator
try:
inc = orch.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError) as e:
except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass
# ``ValueError`` covers the SessionStore id-format guard
# (``Invalid incident id ...``) which we treat as a 404
# at the API boundary — the client passed an id that
# cannot exist, semantically equivalent to "not found".
raise HTTPException(
status_code=404, detail="session not found"
status_code=404, detail=_SESSION_NOT_FOUND_DETAIL
) from e
# Defensive: ``svc`` is unused here today — the read goes through
# the orchestrator's store. We keep the reference so a future
Expand Down Expand Up @@ -15761,9 +15762,9 @@ async def submit_approval_decision(
orch = request.app.state.orchestrator
try:
orch.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError) as e:
except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass
raise HTTPException(
status_code=404, detail="session not found"
status_code=404, detail=_SESSION_NOT_FOUND_DETAIL
) from e

decision_payload = {
Expand Down Expand Up @@ -15874,9 +15875,9 @@ async def get_session_detail(session_id: str, request: Request) -> dict:
orch = request.app.state.orchestrator
try:
return orch.get_session(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError) as e:
except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass
raise HTTPException(
status_code=404, detail="session not found",
status_code=404, detail=_SESSION_NOT_FOUND_DETAIL,
) from e

@fastapi_app.post("/sessions/{session_id}/resume")
Expand Down Expand Up @@ -15911,7 +15912,7 @@ async def _events():
}
yield f"data: {json.dumps(err, default=str)}\n\n"

return StreamingResponse(_events(), media_type="text/event-stream")
return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE)

@fastapi_app.post("/sessions/{session_id}/retry")
async def retry_session_sse(
Expand All @@ -15934,12 +15935,9 @@ async def _events():
}
yield f"data: {json.dumps(err, default=str)}\n\n"

return StreamingResponse(_events(), media_type="text/event-stream")
return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE)

@fastapi_app.get(
"/sessions/{session_id}/retry/preview",
response_model=RetryDecisionPreview,
)
@fastapi_app.get("/sessions/{session_id}/retry/preview")
async def preview_retry(
session_id: str, request: Request,
) -> RetryDecisionPreview:
Expand All @@ -15949,19 +15947,16 @@ async def preview_retry(
orch = request.app.state.orchestrator
try:
decision = orch.preview_retry_decision(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError) as e:
except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass
raise HTTPException(
status_code=404, detail="session not found",
status_code=404, detail=_SESSION_NOT_FOUND_DETAIL,
) from e
return RetryDecisionPreview(
retry=bool(decision.retry),
reason=str(decision.reason),
)

@fastapi_app.get(
"/sessions/{session_id}/lessons",
response_model=list[LessonResponse],
)
@fastapi_app.get("/sessions/{session_id}/lessons")
async def list_session_lessons(
session_id: str, request: Request,
) -> list[LessonResponse]:
Expand Down Expand Up @@ -16057,7 +16052,7 @@ async def _stream():
last_seq = ev.seq
yield f"data: {envelope.model_dump_json()}\n\n"

return StreamingResponse(_stream(), media_type="text/event-stream")
return StreamingResponse(_stream(), media_type=_SSE_MEDIA_TYPE)

@fastapi_app.websocket("/ws/sessions/{session_id}/events")
async def ws_events(websocket: WebSocket, session_id: str) -> None:
Expand Down Expand Up @@ -16165,7 +16160,6 @@ def register_dedup_routes(

@app.post(
"/sessions/{session_id}/un-duplicate",
response_model=UnDuplicateResponse,
status_code=200,
tags=["dedup"],
)
Expand Down
Loading
Loading