diff --git a/dist/app.py b/dist/app.py index 088db4b..295fca2 100644 --- a/dist/app.py +++ b/dist/app.py @@ -690,8 +690,11 @@ class IncidentState(Session): # Phase 11 (FOC-04): forward-reference imports for the should_gate # signature only; kept inside ``TYPE_CHECKING`` so the bundle's -# intra-import stripper does not remove a load-bearing import. The -# ``pass`` keeps the block syntactically valid after stripping. +# intra-import stripper sees them. The bundler's +# ``_ORPHANED_TYPE_CHECKING_RE`` rewrite injects a ``pass`` body when +# the imports get stripped (build_single_file.py:292) — but its regex +# requires the ``if TYPE_CHECKING:`` line to have no trailing comment, +# so do not add one here. # ----- imports for runtime/agents/responsive.py ----- """Responsive agent kind — the today-default LLM agent. @@ -1397,6 +1400,9 @@ async def _poll(self, registry): +# Forward-import: ``runtime.triggers.base`` only defines a dataclass and +# the type appears in a method annotation. Kept inside ``TYPE_CHECKING`` +# to avoid a runtime circular import. # ----- imports for runtime/api.py ----- """FastAPI app — health, listings, incident, and multi-session endpoints. @@ -3590,6 +3596,12 @@ class Base(DeclarativeBase): pass +# SQL fragment used as the partial-index predicate so soft-deleted +# rows don't bloat the indexes (mirrors the application-layer +# "exclude deleted" filter in SessionStore queries). +_ACTIVE_ROW_SQL = "deleted_at IS NULL" + + class IncidentRow(Base): __tablename__ = "incidents" @@ -3641,11 +3653,11 @@ class IncidentRow(Base): __table_args__ = ( Index("ix_incidents_status_env_active", "status", "environment", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_created_at_active", "created_at", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_parent_session_id", "parent_session_id"), ) @@ -6799,7 +6811,7 @@ def parse_envelope_from_result( continue try: payload = json.loads(content) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -8265,12 +8277,8 @@ async def _resume_with_timeout( # ====== module: runtime/policy.py ====== -if TYPE_CHECKING: # pragma: no cover -- type checking only - - - pass # noqa: PIE790 -- bundle survives even if imports are stripped - - +if TYPE_CHECKING: + pass GateReason = Literal[ "auto", "high_risk_tool", @@ -9974,7 +9982,7 @@ def _try_recover_envelope_from_raw(raw: str) -> AgentTurnOutput | None: for candidate in candidates: try: payload = json.loads(candidate) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -10997,9 +11005,6 @@ async def make_checkpointer( if TYPE_CHECKING: pass - - - @dataclass(frozen=True) class TriggerInfo: """Provenance attached to every session started via a trigger. @@ -11469,9 +11474,6 @@ async def stop(self) -> None: if TYPE_CHECKING: pass - - - _log = logging.getLogger(__name__) @@ -11564,7 +11566,7 @@ async def handler( ) except KeyError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc - except (ValueError, TypeError, ValidationError) as exc: + except (ValueError, TypeError) as exc: # pydantic ValidationError is a ValueError subclass _log.warning( "trigger %r transform/dispatch failed: %s", name, exc ) @@ -11577,8 +11579,6 @@ async def handler( if TYPE_CHECKING: pass - - _log = logging.getLogger(__name__) @@ -13540,15 +13540,7 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: # ====== module: runtime/orchestrator.py ====== if TYPE_CHECKING: - # Avoid a runtime circular import — ``runtime.triggers.base`` only - # defines a dataclass, and the type appears in a method annotation. pass - - - - - - from langgraph.errors import GraphInterrupt from langgraph.types import Command @@ -13567,6 +13559,12 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: _log = logging.getLogger("runtime.orchestrator") +# Marker that ``runtime.graph._handle_agent_failure`` writes onto the +# AgentRun.summary of the failing turn. Read by the retry / extract-error +# helpers below. +_AGENT_FAILURE_MARKER = "agent failed:" + + def _assert_envelope_invariant_on_finalize(session: "Session") -> None: """Phase 10 (FOC-03) defence-in-depth log sweep. @@ -14484,9 +14482,9 @@ def _extract_last_error(inc: "Session") -> Exception | None: import pydantic as _pydantic for run in reversed(inc.agents_run): summary = (run.summary or "") - if not summary.startswith("agent failed:"): + if not summary.startswith(_AGENT_FAILURE_MARKER): continue - body = summary.removeprefix("agent failed:").strip() + body = summary.removeprefix(_AGENT_FAILURE_MARKER).strip() if "EnvelopeMissingError" in body: return _EnvelopeMissingError( agent=run.agent or "unknown", @@ -15064,7 +15062,7 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: # successful runs. Retry attempts then append fresh runs. inc.agents_run = [ r for r in inc.agents_run - if not (r.summary or "").startswith("agent failed:") + if not (r.summary or "").startswith(_AGENT_FAILURE_MARKER) ] # Bump retry counter for unique LangGraph thread id (the prior # thread's checkpoint sits at a terminal node and would @@ -15207,6 +15205,13 @@ def _event_ts() -> str: _log = logging.getLogger("runtime.api") +# Wire-format constants (extracted to keep S1192 — duplicated literal +# strings — in check; every SSE endpoint uses _SSE_MEDIA_TYPE, every +# session-not-found path raises with _SESSION_NOT_FOUND_DETAIL). +_SSE_MEDIA_TYPE = "text/event-stream" +_SESSION_NOT_FOUND_DETAIL = "session not found" + + # HTTP status -> structured error code. Used by the global exception # handler to keep React's error UI from having to switch on every # integer status code. @@ -15619,7 +15624,7 @@ async def _events(): ): yield f"data: {json.dumps(ev, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/incidents/{incident_id}/resume") async def resume_incident(incident_id: str, req: ResumeRequest) -> StreamingResponse: @@ -15647,7 +15652,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) # ------------------------------------------------------------------ # Multi-session endpoints @@ -15655,7 +15660,6 @@ async def _events(): @fastapi_app.post( "/sessions", - response_model=SessionStartResponse, status_code=201, ) async def start_session_endpoint( @@ -15685,7 +15689,7 @@ class is matched by name so this handler does not depend on a raise return SessionStartResponse(session_id=sid) - @fastapi_app.get("/sessions", response_model=list[SessionStatus]) + @fastapi_app.get("/sessions") async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: """Snapshot of in-flight sessions (running / awaiting_input / error).""" svc = request.app.state.service @@ -15695,10 +15699,7 @@ async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: # HITL approval endpoints (risk-rated tool gateway) # ------------------------------------------------------------------ - @fastapi_app.get( - "/sessions/{session_id}/approvals", - response_model=list[PendingApproval], - ) + @fastapi_app.get("/sessions/{session_id}/approvals") async def list_pending_approvals( session_id: str, request: Request ) -> list[PendingApproval]: @@ -15713,13 +15714,13 @@ async def list_pending_approvals( orch = request.app.state.orchestrator try: inc = orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass # ``ValueError`` covers the SessionStore id-format guard # (``Invalid incident id ...``) which we treat as a 404 # at the API boundary — the client passed an id that # cannot exist, semantically equivalent to "not found". raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e # Defensive: ``svc`` is unused here today — the read goes through # the orchestrator's store. We keep the reference so a future @@ -15761,9 +15762,9 @@ async def submit_approval_decision( orch = request.app.state.orchestrator try: orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e decision_payload = { @@ -15874,9 +15875,9 @@ async def get_session_detail(session_id: str, request: Request) -> dict: orch = request.app.state.orchestrator try: return orch.get_session(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e @fastapi_app.post("/sessions/{session_id}/resume") @@ -15911,7 +15912,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/sessions/{session_id}/retry") async def retry_session_sse( @@ -15934,12 +15935,9 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) - @fastapi_app.get( - "/sessions/{session_id}/retry/preview", - response_model=RetryDecisionPreview, - ) + @fastapi_app.get("/sessions/{session_id}/retry/preview") async def preview_retry( session_id: str, request: Request, ) -> RetryDecisionPreview: @@ -15949,19 +15947,16 @@ async def preview_retry( orch = request.app.state.orchestrator try: decision = orch.preview_retry_decision(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e return RetryDecisionPreview( retry=bool(decision.retry), reason=str(decision.reason), ) - @fastapi_app.get( - "/sessions/{session_id}/lessons", - response_model=list[LessonResponse], - ) + @fastapi_app.get("/sessions/{session_id}/lessons") async def list_session_lessons( session_id: str, request: Request, ) -> list[LessonResponse]: @@ -16057,7 +16052,7 @@ async def _stream(): last_seq = ev.seq yield f"data: {envelope.model_dump_json()}\n\n" - return StreamingResponse(_stream(), media_type="text/event-stream") + return StreamingResponse(_stream(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.websocket("/ws/sessions/{session_id}/events") async def ws_events(websocket: WebSocket, session_id: str) -> None: @@ -16165,7 +16160,6 @@ def register_dedup_routes( @app.post( "/sessions/{session_id}/un-duplicate", - response_model=UnDuplicateResponse, status_code=200, tags=["dedup"], ) diff --git a/dist/apps/code-review.py b/dist/apps/code-review.py index 40defdf..2629fee 100644 --- a/dist/apps/code-review.py +++ b/dist/apps/code-review.py @@ -690,8 +690,11 @@ class IncidentState(Session): # Phase 11 (FOC-04): forward-reference imports for the should_gate # signature only; kept inside ``TYPE_CHECKING`` so the bundle's -# intra-import stripper does not remove a load-bearing import. The -# ``pass`` keeps the block syntactically valid after stripping. +# intra-import stripper sees them. The bundler's +# ``_ORPHANED_TYPE_CHECKING_RE`` rewrite injects a ``pass`` body when +# the imports get stripped (build_single_file.py:292) — but its regex +# requires the ``if TYPE_CHECKING:`` line to have no trailing comment, +# so do not add one here. # ----- imports for runtime/agents/responsive.py ----- """Responsive agent kind — the today-default LLM agent. @@ -1397,6 +1400,9 @@ async def _poll(self, registry): +# Forward-import: ``runtime.triggers.base`` only defines a dataclass and +# the type appears in a method annotation. Kept inside ``TYPE_CHECKING`` +# to avoid a runtime circular import. # ----- imports for runtime/api.py ----- """FastAPI app — health, listings, incident, and multi-session endpoints. @@ -3643,6 +3649,12 @@ class Base(DeclarativeBase): pass +# SQL fragment used as the partial-index predicate so soft-deleted +# rows don't bloat the indexes (mirrors the application-layer +# "exclude deleted" filter in SessionStore queries). +_ACTIVE_ROW_SQL = "deleted_at IS NULL" + + class IncidentRow(Base): __tablename__ = "incidents" @@ -3694,11 +3706,11 @@ class IncidentRow(Base): __table_args__ = ( Index("ix_incidents_status_env_active", "status", "environment", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_created_at_active", "created_at", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_parent_session_id", "parent_session_id"), ) @@ -6852,7 +6864,7 @@ def parse_envelope_from_result( continue try: payload = json.loads(content) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -8318,12 +8330,8 @@ async def _resume_with_timeout( # ====== module: runtime/policy.py ====== -if TYPE_CHECKING: # pragma: no cover -- type checking only - - - pass # noqa: PIE790 -- bundle survives even if imports are stripped - - +if TYPE_CHECKING: + pass GateReason = Literal[ "auto", "high_risk_tool", @@ -10027,7 +10035,7 @@ def _try_recover_envelope_from_raw(raw: str) -> AgentTurnOutput | None: for candidate in candidates: try: payload = json.loads(candidate) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -11050,9 +11058,6 @@ async def make_checkpointer( if TYPE_CHECKING: pass - - - @dataclass(frozen=True) class TriggerInfo: """Provenance attached to every session started via a trigger. @@ -11522,9 +11527,6 @@ async def stop(self) -> None: if TYPE_CHECKING: pass - - - _log = logging.getLogger(__name__) @@ -11617,7 +11619,7 @@ async def handler( ) except KeyError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc - except (ValueError, TypeError, ValidationError) as exc: + except (ValueError, TypeError) as exc: # pydantic ValidationError is a ValueError subclass _log.warning( "trigger %r transform/dispatch failed: %s", name, exc ) @@ -11630,8 +11632,6 @@ async def handler( if TYPE_CHECKING: pass - - _log = logging.getLogger(__name__) @@ -13593,15 +13593,7 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: # ====== module: runtime/orchestrator.py ====== if TYPE_CHECKING: - # Avoid a runtime circular import — ``runtime.triggers.base`` only - # defines a dataclass, and the type appears in a method annotation. pass - - - - - - from langgraph.errors import GraphInterrupt from langgraph.types import Command @@ -13620,6 +13612,12 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: _log = logging.getLogger("runtime.orchestrator") +# Marker that ``runtime.graph._handle_agent_failure`` writes onto the +# AgentRun.summary of the failing turn. Read by the retry / extract-error +# helpers below. +_AGENT_FAILURE_MARKER = "agent failed:" + + def _assert_envelope_invariant_on_finalize(session: "Session") -> None: """Phase 10 (FOC-03) defence-in-depth log sweep. @@ -14537,9 +14535,9 @@ def _extract_last_error(inc: "Session") -> Exception | None: import pydantic as _pydantic for run in reversed(inc.agents_run): summary = (run.summary or "") - if not summary.startswith("agent failed:"): + if not summary.startswith(_AGENT_FAILURE_MARKER): continue - body = summary.removeprefix("agent failed:").strip() + body = summary.removeprefix(_AGENT_FAILURE_MARKER).strip() if "EnvelopeMissingError" in body: return _EnvelopeMissingError( agent=run.agent or "unknown", @@ -15117,7 +15115,7 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: # successful runs. Retry attempts then append fresh runs. inc.agents_run = [ r for r in inc.agents_run - if not (r.summary or "").startswith("agent failed:") + if not (r.summary or "").startswith(_AGENT_FAILURE_MARKER) ] # Bump retry counter for unique LangGraph thread id (the prior # thread's checkpoint sits at a terminal node and would @@ -15260,6 +15258,13 @@ def _event_ts() -> str: _log = logging.getLogger("runtime.api") +# Wire-format constants (extracted to keep S1192 — duplicated literal +# strings — in check; every SSE endpoint uses _SSE_MEDIA_TYPE, every +# session-not-found path raises with _SESSION_NOT_FOUND_DETAIL). +_SSE_MEDIA_TYPE = "text/event-stream" +_SESSION_NOT_FOUND_DETAIL = "session not found" + + # HTTP status -> structured error code. Used by the global exception # handler to keep React's error UI from having to switch on every # integer status code. @@ -15672,7 +15677,7 @@ async def _events(): ): yield f"data: {json.dumps(ev, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/incidents/{incident_id}/resume") async def resume_incident(incident_id: str, req: ResumeRequest) -> StreamingResponse: @@ -15700,7 +15705,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) # ------------------------------------------------------------------ # Multi-session endpoints @@ -15708,7 +15713,6 @@ async def _events(): @fastapi_app.post( "/sessions", - response_model=SessionStartResponse, status_code=201, ) async def start_session_endpoint( @@ -15738,7 +15742,7 @@ class is matched by name so this handler does not depend on a raise return SessionStartResponse(session_id=sid) - @fastapi_app.get("/sessions", response_model=list[SessionStatus]) + @fastapi_app.get("/sessions") async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: """Snapshot of in-flight sessions (running / awaiting_input / error).""" svc = request.app.state.service @@ -15748,10 +15752,7 @@ async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: # HITL approval endpoints (risk-rated tool gateway) # ------------------------------------------------------------------ - @fastapi_app.get( - "/sessions/{session_id}/approvals", - response_model=list[PendingApproval], - ) + @fastapi_app.get("/sessions/{session_id}/approvals") async def list_pending_approvals( session_id: str, request: Request ) -> list[PendingApproval]: @@ -15766,13 +15767,13 @@ async def list_pending_approvals( orch = request.app.state.orchestrator try: inc = orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass # ``ValueError`` covers the SessionStore id-format guard # (``Invalid incident id ...``) which we treat as a 404 # at the API boundary — the client passed an id that # cannot exist, semantically equivalent to "not found". raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e # Defensive: ``svc`` is unused here today — the read goes through # the orchestrator's store. We keep the reference so a future @@ -15814,9 +15815,9 @@ async def submit_approval_decision( orch = request.app.state.orchestrator try: orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e decision_payload = { @@ -15927,9 +15928,9 @@ async def get_session_detail(session_id: str, request: Request) -> dict: orch = request.app.state.orchestrator try: return orch.get_session(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e @fastapi_app.post("/sessions/{session_id}/resume") @@ -15964,7 +15965,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/sessions/{session_id}/retry") async def retry_session_sse( @@ -15987,12 +15988,9 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) - @fastapi_app.get( - "/sessions/{session_id}/retry/preview", - response_model=RetryDecisionPreview, - ) + @fastapi_app.get("/sessions/{session_id}/retry/preview") async def preview_retry( session_id: str, request: Request, ) -> RetryDecisionPreview: @@ -16002,19 +16000,16 @@ async def preview_retry( orch = request.app.state.orchestrator try: decision = orch.preview_retry_decision(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e return RetryDecisionPreview( retry=bool(decision.retry), reason=str(decision.reason), ) - @fastapi_app.get( - "/sessions/{session_id}/lessons", - response_model=list[LessonResponse], - ) + @fastapi_app.get("/sessions/{session_id}/lessons") async def list_session_lessons( session_id: str, request: Request, ) -> list[LessonResponse]: @@ -16110,7 +16105,7 @@ async def _stream(): last_seq = ev.seq yield f"data: {envelope.model_dump_json()}\n\n" - return StreamingResponse(_stream(), media_type="text/event-stream") + return StreamingResponse(_stream(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.websocket("/ws/sessions/{session_id}/events") async def ws_events(websocket: WebSocket, session_id: str) -> None: @@ -16218,7 +16213,6 @@ def register_dedup_routes( @app.post( "/sessions/{session_id}/un-duplicate", - response_model=UnDuplicateResponse, status_code=200, tags=["dedup"], ) diff --git a/dist/apps/incident-management.py b/dist/apps/incident-management.py index 563f9b4..b3725dc 100644 --- a/dist/apps/incident-management.py +++ b/dist/apps/incident-management.py @@ -690,8 +690,11 @@ class IncidentState(Session): # Phase 11 (FOC-04): forward-reference imports for the should_gate # signature only; kept inside ``TYPE_CHECKING`` so the bundle's -# intra-import stripper does not remove a load-bearing import. The -# ``pass`` keeps the block syntactically valid after stripping. +# intra-import stripper sees them. The bundler's +# ``_ORPHANED_TYPE_CHECKING_RE`` rewrite injects a ``pass`` body when +# the imports get stripped (build_single_file.py:292) — but its regex +# requires the ``if TYPE_CHECKING:`` line to have no trailing comment, +# so do not add one here. # ----- imports for runtime/agents/responsive.py ----- """Responsive agent kind — the today-default LLM agent. @@ -1397,6 +1400,9 @@ async def _poll(self, registry): +# Forward-import: ``runtime.triggers.base`` only defines a dataclass and +# the type appears in a method annotation. Kept inside ``TYPE_CHECKING`` +# to avoid a runtime circular import. # ----- imports for runtime/api.py ----- """FastAPI app — health, listings, incident, and multi-session endpoints. @@ -3655,6 +3661,12 @@ class Base(DeclarativeBase): pass +# SQL fragment used as the partial-index predicate so soft-deleted +# rows don't bloat the indexes (mirrors the application-layer +# "exclude deleted" filter in SessionStore queries). +_ACTIVE_ROW_SQL = "deleted_at IS NULL" + + class IncidentRow(Base): __tablename__ = "incidents" @@ -3706,11 +3718,11 @@ class IncidentRow(Base): __table_args__ = ( Index("ix_incidents_status_env_active", "status", "environment", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_created_at_active", "created_at", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_parent_session_id", "parent_session_id"), ) @@ -6864,7 +6876,7 @@ def parse_envelope_from_result( continue try: payload = json.loads(content) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -8330,12 +8342,8 @@ async def _resume_with_timeout( # ====== module: runtime/policy.py ====== -if TYPE_CHECKING: # pragma: no cover -- type checking only - - - pass # noqa: PIE790 -- bundle survives even if imports are stripped - - +if TYPE_CHECKING: + pass GateReason = Literal[ "auto", "high_risk_tool", @@ -10039,7 +10047,7 @@ def _try_recover_envelope_from_raw(raw: str) -> AgentTurnOutput | None: for candidate in candidates: try: payload = json.loads(candidate) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue @@ -11062,9 +11070,6 @@ async def make_checkpointer( if TYPE_CHECKING: pass - - - @dataclass(frozen=True) class TriggerInfo: """Provenance attached to every session started via a trigger. @@ -11534,9 +11539,6 @@ async def stop(self) -> None: if TYPE_CHECKING: pass - - - _log = logging.getLogger(__name__) @@ -11629,7 +11631,7 @@ async def handler( ) except KeyError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc - except (ValueError, TypeError, ValidationError) as exc: + except (ValueError, TypeError) as exc: # pydantic ValidationError is a ValueError subclass _log.warning( "trigger %r transform/dispatch failed: %s", name, exc ) @@ -11642,8 +11644,6 @@ async def handler( if TYPE_CHECKING: pass - - _log = logging.getLogger(__name__) @@ -13605,15 +13605,7 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: # ====== module: runtime/orchestrator.py ====== if TYPE_CHECKING: - # Avoid a runtime circular import — ``runtime.triggers.base`` only - # defines a dataclass, and the type appears in a method annotation. pass - - - - - - from langgraph.errors import GraphInterrupt from langgraph.types import Command @@ -13632,6 +13624,12 @@ def gc_orphaned_checkpoints(engine: Engine) -> int: _log = logging.getLogger("runtime.orchestrator") +# Marker that ``runtime.graph._handle_agent_failure`` writes onto the +# AgentRun.summary of the failing turn. Read by the retry / extract-error +# helpers below. +_AGENT_FAILURE_MARKER = "agent failed:" + + def _assert_envelope_invariant_on_finalize(session: "Session") -> None: """Phase 10 (FOC-03) defence-in-depth log sweep. @@ -14549,9 +14547,9 @@ def _extract_last_error(inc: "Session") -> Exception | None: import pydantic as _pydantic for run in reversed(inc.agents_run): summary = (run.summary or "") - if not summary.startswith("agent failed:"): + if not summary.startswith(_AGENT_FAILURE_MARKER): continue - body = summary.removeprefix("agent failed:").strip() + body = summary.removeprefix(_AGENT_FAILURE_MARKER).strip() if "EnvelopeMissingError" in body: return _EnvelopeMissingError( agent=run.agent or "unknown", @@ -15129,7 +15127,7 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: # successful runs. Retry attempts then append fresh runs. inc.agents_run = [ r for r in inc.agents_run - if not (r.summary or "").startswith("agent failed:") + if not (r.summary or "").startswith(_AGENT_FAILURE_MARKER) ] # Bump retry counter for unique LangGraph thread id (the prior # thread's checkpoint sits at a terminal node and would @@ -15272,6 +15270,13 @@ def _event_ts() -> str: _log = logging.getLogger("runtime.api") +# Wire-format constants (extracted to keep S1192 — duplicated literal +# strings — in check; every SSE endpoint uses _SSE_MEDIA_TYPE, every +# session-not-found path raises with _SESSION_NOT_FOUND_DETAIL). +_SSE_MEDIA_TYPE = "text/event-stream" +_SESSION_NOT_FOUND_DETAIL = "session not found" + + # HTTP status -> structured error code. Used by the global exception # handler to keep React's error UI from having to switch on every # integer status code. @@ -15684,7 +15689,7 @@ async def _events(): ): yield f"data: {json.dumps(ev, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/incidents/{incident_id}/resume") async def resume_incident(incident_id: str, req: ResumeRequest) -> StreamingResponse: @@ -15712,7 +15717,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) # ------------------------------------------------------------------ # Multi-session endpoints @@ -15720,7 +15725,6 @@ async def _events(): @fastapi_app.post( "/sessions", - response_model=SessionStartResponse, status_code=201, ) async def start_session_endpoint( @@ -15750,7 +15754,7 @@ class is matched by name so this handler does not depend on a raise return SessionStartResponse(session_id=sid) - @fastapi_app.get("/sessions", response_model=list[SessionStatus]) + @fastapi_app.get("/sessions") async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: """Snapshot of in-flight sessions (running / awaiting_input / error).""" svc = request.app.state.service @@ -15760,10 +15764,7 @@ async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: # HITL approval endpoints (risk-rated tool gateway) # ------------------------------------------------------------------ - @fastapi_app.get( - "/sessions/{session_id}/approvals", - response_model=list[PendingApproval], - ) + @fastapi_app.get("/sessions/{session_id}/approvals") async def list_pending_approvals( session_id: str, request: Request ) -> list[PendingApproval]: @@ -15778,13 +15779,13 @@ async def list_pending_approvals( orch = request.app.state.orchestrator try: inc = orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass # ``ValueError`` covers the SessionStore id-format guard # (``Invalid incident id ...``) which we treat as a 404 # at the API boundary — the client passed an id that # cannot exist, semantically equivalent to "not found". raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e # Defensive: ``svc`` is unused here today — the read goes through # the orchestrator's store. We keep the reference so a future @@ -15826,9 +15827,9 @@ async def submit_approval_decision( orch = request.app.state.orchestrator try: orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e decision_payload = { @@ -15939,9 +15940,9 @@ async def get_session_detail(session_id: str, request: Request) -> dict: orch = request.app.state.orchestrator try: return orch.get_session(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e @fastapi_app.post("/sessions/{session_id}/resume") @@ -15976,7 +15977,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/sessions/{session_id}/retry") async def retry_session_sse( @@ -15999,12 +16000,9 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) - @fastapi_app.get( - "/sessions/{session_id}/retry/preview", - response_model=RetryDecisionPreview, - ) + @fastapi_app.get("/sessions/{session_id}/retry/preview") async def preview_retry( session_id: str, request: Request, ) -> RetryDecisionPreview: @@ -16014,19 +16012,16 @@ async def preview_retry( orch = request.app.state.orchestrator try: decision = orch.preview_retry_decision(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e return RetryDecisionPreview( retry=bool(decision.retry), reason=str(decision.reason), ) - @fastapi_app.get( - "/sessions/{session_id}/lessons", - response_model=list[LessonResponse], - ) + @fastapi_app.get("/sessions/{session_id}/lessons") async def list_session_lessons( session_id: str, request: Request, ) -> list[LessonResponse]: @@ -16122,7 +16117,7 @@ async def _stream(): last_seq = ev.seq yield f"data: {envelope.model_dump_json()}\n\n" - return StreamingResponse(_stream(), media_type="text/event-stream") + return StreamingResponse(_stream(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.websocket("/ws/sessions/{session_id}/events") async def ws_events(websocket: WebSocket, session_id: str) -> None: @@ -16230,7 +16225,6 @@ def register_dedup_routes( @app.post( "/sessions/{session_id}/un-duplicate", - response_model=UnDuplicateResponse, status_code=200, tags=["dedup"], ) diff --git a/sonar-project.properties b/sonar-project.properties index 53a01b4..e4616b3 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -44,13 +44,35 @@ sonar.cpd.exclusions=src/runtime/tools/gateway.py,src/runtime/agents/responsive. # * ``src/runtime/__init__.py`` — re-export surface only. sonar.coverage.exclusions=src/runtime/__init__.py,src/runtime/ui.py,src/runtime/__main__.py,src/runtime/checkpointer_postgres.py,src/runtime/triggers/transports/plugin.py,examples/**,ui/** -# Suppress python:S7503 (async-without-await) for framework-driven async signatures. -# LangGraph nodes and FastMCP tool handlers MUST be `async def` even when their -# bodies are synchronous — removing async breaks the framework contract. -sonar.issue.ignore.multicriteria=e1,e2,e3 +# Suppress python:S7503 (async-without-await) for framework-driven async +# signatures. Each suppressed file has one or more `async def` whose body +# is synchronous because the surrounding framework requires async: +# * mcp_servers/** + examples/**/mcp_server.py — FastMCP tool handlers +# * graph.py + agents/supervisor.py — LangGraph node contract +# * llm.py — langchain BaseChatModel.ainvoke / structured-runnable contract +# * api.py — FastAPI handler / Depends callable contract +# * service.py — service.submit_async() expects an awaitable +# * tools/approval_watchdog.py + learning/scheduler.py — APScheduler +# async-callback contract +# * triggers/auth.py — FastAPI Depends() bearer dependency contract +sonar.issue.ignore.multicriteria=e1,e2,e3,e4,e5,e6,e7,e8,e9,e10 sonar.issue.ignore.multicriteria.e1.ruleKey=python:S7503 sonar.issue.ignore.multicriteria.e1.resourceKey=src/runtime/mcp_servers/**/*.py sonar.issue.ignore.multicriteria.e2.ruleKey=python:S7503 sonar.issue.ignore.multicriteria.e2.resourceKey=src/runtime/graph.py sonar.issue.ignore.multicriteria.e3.ruleKey=python:S7503 sonar.issue.ignore.multicriteria.e3.resourceKey=examples/**/mcp_server.py +sonar.issue.ignore.multicriteria.e4.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e4.resourceKey=src/runtime/learning/scheduler.py +sonar.issue.ignore.multicriteria.e5.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e5.resourceKey=src/runtime/llm.py +sonar.issue.ignore.multicriteria.e6.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e6.resourceKey=src/runtime/agents/supervisor.py +sonar.issue.ignore.multicriteria.e7.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e7.resourceKey=src/runtime/api.py +sonar.issue.ignore.multicriteria.e8.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e8.resourceKey=src/runtime/service.py +sonar.issue.ignore.multicriteria.e9.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e9.resourceKey=src/runtime/tools/approval_watchdog.py +sonar.issue.ignore.multicriteria.e10.ruleKey=python:S7503 +sonar.issue.ignore.multicriteria.e10.resourceKey=src/runtime/triggers/auth.py diff --git a/src/runtime/agents/turn_output.py b/src/runtime/agents/turn_output.py index 2b75708..f409208 100644 --- a/src/runtime/agents/turn_output.py +++ b/src/runtime/agents/turn_output.py @@ -299,7 +299,7 @@ def parse_envelope_from_result( continue try: payload = json.loads(content) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue diff --git a/src/runtime/api.py b/src/runtime/api.py index 5bf8077..f47ed77 100644 --- a/src/runtime/api.py +++ b/src/runtime/api.py @@ -39,6 +39,13 @@ _log = logging.getLogger("runtime.api") +# Wire-format constants (extracted to keep S1192 — duplicated literal +# strings — in check; every SSE endpoint uses _SSE_MEDIA_TYPE, every +# session-not-found path raises with _SESSION_NOT_FOUND_DETAIL). +_SSE_MEDIA_TYPE = "text/event-stream" +_SESSION_NOT_FOUND_DETAIL = "session not found" + + # HTTP status -> structured error code. Used by the global exception # handler to keep React's error UI from having to switch on every # integer status code. @@ -451,7 +458,7 @@ async def _events(): ): yield f"data: {json.dumps(ev, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/incidents/{incident_id}/resume") async def resume_incident(incident_id: str, req: ResumeRequest) -> StreamingResponse: @@ -479,7 +486,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) # ------------------------------------------------------------------ # Multi-session endpoints @@ -487,7 +494,6 @@ async def _events(): @fastapi_app.post( "/sessions", - response_model=SessionStartResponse, status_code=201, ) async def start_session_endpoint( @@ -517,7 +523,7 @@ class is matched by name so this handler does not depend on a raise return SessionStartResponse(session_id=sid) - @fastapi_app.get("/sessions", response_model=list[SessionStatus]) + @fastapi_app.get("/sessions") async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: """Snapshot of in-flight sessions (running / awaiting_input / error).""" svc = request.app.state.service @@ -527,10 +533,7 @@ async def list_sessions_endpoint(request: Request) -> list[SessionStatus]: # HITL approval endpoints (risk-rated tool gateway) # ------------------------------------------------------------------ - @fastapi_app.get( - "/sessions/{session_id}/approvals", - response_model=list[PendingApproval], - ) + @fastapi_app.get("/sessions/{session_id}/approvals") async def list_pending_approvals( session_id: str, request: Request ) -> list[PendingApproval]: @@ -545,13 +548,13 @@ async def list_pending_approvals( orch = request.app.state.orchestrator try: inc = orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass # ``ValueError`` covers the SessionStore id-format guard # (``Invalid incident id ...``) which we treat as a 404 # at the API boundary — the client passed an id that # cannot exist, semantically equivalent to "not found". raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e # Defensive: ``svc`` is unused here today — the read goes through # the orchestrator's store. We keep the reference so a future @@ -593,9 +596,9 @@ async def submit_approval_decision( orch = request.app.state.orchestrator try: orch.store.load(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found" + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL ) from e decision_payload = { @@ -706,9 +709,9 @@ async def get_session_detail(session_id: str, request: Request) -> dict: orch = request.app.state.orchestrator try: return orch.get_session(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e @fastapi_app.post("/sessions/{session_id}/resume") @@ -743,7 +746,7 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.post("/sessions/{session_id}/retry") async def retry_session_sse( @@ -766,12 +769,9 @@ async def _events(): } yield f"data: {json.dumps(err, default=str)}\n\n" - return StreamingResponse(_events(), media_type="text/event-stream") + return StreamingResponse(_events(), media_type=_SSE_MEDIA_TYPE) - @fastapi_app.get( - "/sessions/{session_id}/retry/preview", - response_model=RetryDecisionPreview, - ) + @fastapi_app.get("/sessions/{session_id}/retry/preview") async def preview_retry( session_id: str, request: Request, ) -> RetryDecisionPreview: @@ -781,19 +781,16 @@ async def preview_retry( orch = request.app.state.orchestrator try: decision = orch.preview_retry_decision(session_id) - except (FileNotFoundError, ValueError, KeyError, LookupError) as e: + except (FileNotFoundError, ValueError, LookupError) as e: # KeyError is a LookupError subclass raise HTTPException( - status_code=404, detail="session not found", + status_code=404, detail=_SESSION_NOT_FOUND_DETAIL, ) from e return RetryDecisionPreview( retry=bool(decision.retry), reason=str(decision.reason), ) - @fastapi_app.get( - "/sessions/{session_id}/lessons", - response_model=list[LessonResponse], - ) + @fastapi_app.get("/sessions/{session_id}/lessons") async def list_session_lessons( session_id: str, request: Request, ) -> list[LessonResponse]: @@ -890,7 +887,7 @@ async def _stream(): last_seq = ev.seq yield f"data: {envelope.model_dump_json()}\n\n" - return StreamingResponse(_stream(), media_type="text/event-stream") + return StreamingResponse(_stream(), media_type=_SSE_MEDIA_TYPE) @fastapi_app.websocket("/ws/sessions/{session_id}/events") async def ws_events(websocket: WebSocket, session_id: str) -> None: diff --git a/src/runtime/api_dedup.py b/src/runtime/api_dedup.py index 92c73a3..041c297 100644 --- a/src/runtime/api_dedup.py +++ b/src/runtime/api_dedup.py @@ -63,7 +63,6 @@ def register_dedup_routes( @app.post( "/sessions/{session_id}/un-duplicate", - response_model=UnDuplicateResponse, status_code=200, tags=["dedup"], ) diff --git a/src/runtime/graph.py b/src/runtime/graph.py index 7d15e7c..77f8771 100644 --- a/src/runtime/graph.py +++ b/src/runtime/graph.py @@ -599,7 +599,7 @@ def _try_recover_envelope_from_raw(raw: str) -> AgentTurnOutput | None: for candidate in candidates: try: payload = json.loads(candidate) - except (json.JSONDecodeError, ValueError): + except ValueError: # JSONDecodeError is a ValueError subclass continue if not isinstance(payload, dict): continue diff --git a/src/runtime/orchestrator.py b/src/runtime/orchestrator.py index b72bcf5..abff038 100644 --- a/src/runtime/orchestrator.py +++ b/src/runtime/orchestrator.py @@ -20,10 +20,10 @@ resolve_framework_app_config, ) +# Forward-import: ``runtime.triggers.base`` only defines a dataclass and +# the type appears in a method annotation. Kept inside ``TYPE_CHECKING`` +# to avoid a runtime circular import. if TYPE_CHECKING: - # Avoid a runtime circular import — ``runtime.triggers.base`` only - # defines a dataclass, and the type appears in a method annotation. - pass from runtime.triggers.base import TriggerInfo # noqa: F401 from runtime.dedup import DedupConfig, DedupPipeline, DedupResult from runtime.intake import IntakeContext @@ -49,6 +49,12 @@ _log = logging.getLogger("runtime.orchestrator") +# Marker that ``runtime.graph._handle_agent_failure`` writes onto the +# AgentRun.summary of the failing turn. Read by the retry / extract-error +# helpers below. +_AGENT_FAILURE_MARKER = "agent failed:" + + def _assert_envelope_invariant_on_finalize(session: "Session") -> None: """Phase 10 (FOC-03) defence-in-depth log sweep. @@ -971,9 +977,9 @@ def _extract_last_error(inc: "Session") -> Exception | None: import pydantic as _pydantic for run in reversed(inc.agents_run): summary = (run.summary or "") - if not summary.startswith("agent failed:"): + if not summary.startswith(_AGENT_FAILURE_MARKER): continue - body = summary.removeprefix("agent failed:").strip() + body = summary.removeprefix(_AGENT_FAILURE_MARKER).strip() if "EnvelopeMissingError" in body: return _EnvelopeMissingError( agent=run.agent or "unknown", @@ -1551,7 +1557,7 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: # successful runs. Retry attempts then append fresh runs. inc.agents_run = [ r for r in inc.agents_run - if not (r.summary or "").startswith("agent failed:") + if not (r.summary or "").startswith(_AGENT_FAILURE_MARKER) ] # Bump retry counter for unique LangGraph thread id (the prior # thread's checkpoint sits at a terminal node and would diff --git a/src/runtime/policy.py b/src/runtime/policy.py index bc991c4..52aa399 100644 --- a/src/runtime/policy.py +++ b/src/runtime/policy.py @@ -49,12 +49,14 @@ # Phase 11 (FOC-04): forward-reference imports for the should_gate # signature only; kept inside ``TYPE_CHECKING`` so the bundle's -# intra-import stripper does not remove a load-bearing import. The -# ``pass`` keeps the block syntactically valid after stripping. -if TYPE_CHECKING: # pragma: no cover -- type checking only +# intra-import stripper sees them. The bundler's +# ``_ORPHANED_TYPE_CHECKING_RE`` rewrite injects a ``pass`` body when +# the imports get stripped (build_single_file.py:292) — but its regex +# requires the ``if TYPE_CHECKING:`` line to have no trailing comment, +# so do not add one here. +if TYPE_CHECKING: from runtime.config import OrchestratorConfig # noqa: F401 from runtime.state import ToolCall # noqa: F401 - pass # noqa: PIE790 -- bundle survives even if imports are stripped GateReason = Literal[ diff --git a/src/runtime/storage/models.py b/src/runtime/storage/models.py index dd82570..2eeccd7 100644 --- a/src/runtime/storage/models.py +++ b/src/runtime/storage/models.py @@ -14,6 +14,12 @@ class Base(DeclarativeBase): pass +# SQL fragment used as the partial-index predicate so soft-deleted +# rows don't bloat the indexes (mirrors the application-layer +# "exclude deleted" filter in SessionStore queries). +_ACTIVE_ROW_SQL = "deleted_at IS NULL" + + class IncidentRow(Base): __tablename__ = "incidents" @@ -65,11 +71,11 @@ class IncidentRow(Base): __table_args__ = ( Index("ix_incidents_status_env_active", "status", "environment", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_created_at_active", "created_at", - postgresql_where=text("deleted_at IS NULL"), - sqlite_where=text("deleted_at IS NULL")), + postgresql_where=text(_ACTIVE_ROW_SQL), + sqlite_where=text(_ACTIVE_ROW_SQL)), Index("ix_incidents_parent_session_id", "parent_session_id"), ) diff --git a/src/runtime/triggers/base.py b/src/runtime/triggers/base.py index 93bad58..81c29b2 100644 --- a/src/runtime/triggers/base.py +++ b/src/runtime/triggers/base.py @@ -17,7 +17,6 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - pass from runtime.triggers.registry import TriggerRegistry # noqa: F401 diff --git a/src/runtime/triggers/transports/schedule.py b/src/runtime/triggers/transports/schedule.py index fda70b9..fd485eb 100644 --- a/src/runtime/triggers/transports/schedule.py +++ b/src/runtime/triggers/transports/schedule.py @@ -24,7 +24,6 @@ from runtime.triggers.config import ScheduleTriggerConfig if TYPE_CHECKING: - pass from runtime.triggers.registry import TriggerRegistry # noqa: F401 _log = logging.getLogger(__name__) diff --git a/src/runtime/triggers/transports/webhook.py b/src/runtime/triggers/transports/webhook.py index a2f6a6c..96322da 100644 --- a/src/runtime/triggers/transports/webhook.py +++ b/src/runtime/triggers/transports/webhook.py @@ -30,7 +30,6 @@ from runtime.triggers.config import WebhookTriggerConfig if TYPE_CHECKING: - pass from runtime.triggers.idempotency import IdempotencyStore # noqa: F401 from runtime.triggers.registry import ( # noqa: F401 TriggerRegistry, TriggerSpec, @@ -128,7 +127,7 @@ async def handler( ) except KeyError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc - except (ValueError, TypeError, ValidationError) as exc: + except (ValueError, TypeError) as exc: # pydantic ValidationError is a ValueError subclass _log.warning( "trigger %r transform/dispatch failed: %s", name, exc )