From 105572f370da372b93b0acc0c676afe7a941c2f4 Mon Sep 17 00:00:00 2001 From: David Hyrule Date: Mon, 29 Jun 2026 22:22:32 +0200 Subject: [PATCH] feat: enrich producer trace correlations --- agent_core/__init__.py | 2 +- agent_core/adapters/engineering_loop.py | 224 +++++++++++++++++---- agent_core/adapters/knowledge.py | 74 ++++++- pyproject.toml | 2 +- tests/adapters/test_engineering_adapter.py | 29 +++ tests/adapters/test_knowledge_adapter.py | 9 + 6 files changed, 299 insertions(+), 41 deletions(-) diff --git a/agent_core/__init__.py b/agent_core/__init__.py index ffd607b..99fc561 100644 --- a/agent_core/__init__.py +++ b/agent_core/__init__.py @@ -2,4 +2,4 @@ from __future__ import annotations -__version__ = "0.4.0" +__version__ = "0.5.0" diff --git a/agent_core/adapters/engineering_loop.py b/agent_core/adapters/engineering_loop.py index f3f1ec4..908ecfd 100644 --- a/agent_core/adapters/engineering_loop.py +++ b/agent_core/adapters/engineering_loop.py @@ -6,15 +6,19 @@ from __future__ import annotations +import re +from collections.abc import Mapping from typing import Any from agent_core.contracts.decision import DecisionPacket from agent_core.contracts.models import CostUsage +from agent_core.contracts.observatory import ObservatoryLink from agent_core.contracts.task import TaskEnvelope from agent_core.contracts.tools import ToolResult from agent_core.contracts.tracing import TraceEvent GRAPH_ID = "engineering-loop" +_GITHUB_PR_RE = re.compile(r"github\.com/(?P[^/]+)/(?P[^/]+)/pull/(?P\d+)") def task_envelope_from_graph_state(state: dict[str, Any]) -> TaskEnvelope: @@ -81,49 +85,193 @@ def tool_result_from_gate(gate: dict[str, Any]) -> ToolResult: def trace_events_from_loop_trace( trace: dict[str, Any], *, run_id: str | None = None ) -> list[TraceEvent]: + """Convert Engineering Loop trace/state dictionaries to observatory TraceEvents. + + Besides legacy LLM/gate/backend summaries, this emits one ``loop_node`` event for + each compact graph-node trace item when ``trace_events`` (live state) or ``events`` + (``loop_trace.json``) is present. Events carry change/repository/PR correlation so + the observatory can stitch graph actions, GitHub artifacts, and deploy evidence + into a single safe traceback. + """ events: list[TraceEvent] = [] + base_fields = _base_trace_fields(trace, run_id=run_id) + previous_event_id: str | None = None + for item in trace.get("llm_outputs", []) or []: - events.append( - TraceEvent.model_validate( - { - "event_type": "model_call", - "node_id": item.get("role"), - "agent_role": item.get("role"), - "summary": f"role review approved={item.get('approved')}", - "payload": item, - "run_id": run_id, - "graph_id": GRAPH_ID, - } - ) + event = TraceEvent.model_validate( + { + **base_fields, + "event_type": "model_call", + "node_id": item.get("role"), + "agent_role": item.get("role"), + "summary": f"role review approved={item.get('approved')}", + "payload": item, + } ) + events.append(event) + previous_event_id = event.event_id for gate in trace.get("gate_results", []) or []: - events.append( - TraceEvent.model_validate( - { - "event_type": "tool_call", - "node_id": "gate_execution", - "summary": str(gate.get("command")), - "payload": gate, - "run_id": run_id, - "graph_id": GRAPH_ID, - } - ) + event = TraceEvent.model_validate( + { + **base_fields, + "event_type": "tool_call", + "node_id": "gate_execution", + "summary": str(gate.get("command")), + "payload": gate, + "parent_event_id": previous_event_id, + } ) + events.append(event) + previous_event_id = event.event_id for backend_result in trace.get("backend_results", []) or []: - events.append( - TraceEvent.model_validate( - { - "event_type": "backend_execution", - "node_id": "delegate_implementation", - "summary": ( - f"backend={backend_result.get('backend')} " - f"status={backend_result.get('status')}" - ), - "payload": backend_result, - "cost": cost_usage_from_backend_result(backend_result).model_dump(), - "run_id": run_id, - "graph_id": GRAPH_ID, - } - ) + event = TraceEvent.model_validate( + { + **base_fields, + "repository": _repo_from_backend(backend_result) or base_fields.get("repository"), + "event_type": "backend_execution", + "node_id": "delegate_implementation", + "summary": ( + f"backend={backend_result.get('backend')} " + f"status={backend_result.get('status')}" + ), + "payload": backend_result, + "cost": cost_usage_from_backend_result(backend_result).model_dump(), + "parent_event_id": previous_event_id, + } + ) + events.append(event) + previous_event_id = event.event_id + + node_trace = trace.get("trace_events") or trace.get("events") or [] + for item in node_trace: + if not isinstance(item, Mapping): + continue + event = TraceEvent.model_validate( + { + **base_fields, + "event_type": "loop_node", + "node_id": _string_or_none(item.get("node")), + "agent_role": _string_or_none(item.get("role")), + "summary": _node_summary(item), + "payload": _bounded_node_payload(item), + "parent_event_id": previous_event_id, + } ) + events.append(event) + previous_event_id = event.event_id + return events + + +def _base_trace_fields(trace: Mapping[str, Any], *, run_id: str | None) -> dict[str, Any]: + raw_change = trace.get("change") + change: Mapping[str, Any] = raw_change if isinstance(raw_change, Mapping) else {} + change_id = _string_or_none(trace.get("change_id") or change.get("change_id") or run_id) + pr_url = _string_or_none( + trace.get("pr_url") or trace.get("github_pr_url") or _nested(trace, "github", "url") + ) + fields: dict[str, Any] = { + "run_id": run_id, + "graph_id": GRAPH_ID, + "change_id": change_id, + "repository": _repository_from_trace(trace, pr_url=pr_url), + "pr_number": _int_or_none( + trace.get("pr_number") + or _nested(trace, "github", "number") + or _pr_number_from_url(pr_url) + ), + "commit_sha": _string_or_none( + trace.get("commit_sha") + or trace.get("remote_commit") + or trace.get("commit") + or _nested(trace, "github", "commit") + ), + "workflow_run_id": _string_or_none( + trace.get("workflow_run_id") or trace.get("github_run_id") + ), + "links": _links(trace, pr_url=pr_url), + } + return fields + + +def _repository_from_trace(trace: Mapping[str, Any], *, pr_url: str | None) -> str | None: + direct = _string_or_none(trace.get("repository") or trace.get("feature_target_repo")) + if direct: + return direct + match = _GITHUB_PR_RE.search(pr_url or "") + if match: + return f"{match.group('owner')}/{match.group('repo')}" + names = trace.get("promotion_repo_names") + if isinstance(names, list) and len(names) == 1: + return _string_or_none(names[0]) + repos = trace.get("promotion_repositories") + if isinstance(repos, Mapping) and len(repos) == 1: + return _string_or_none(next(iter(repos))) + return None + + +def _repo_from_backend(backend_result: Mapping[str, Any]) -> str | None: + return _string_or_none(backend_result.get("repository") or backend_result.get("repo")) + + +def _links(trace: Mapping[str, Any], *, pr_url: str | None) -> list[ObservatoryLink]: + links: list[ObservatoryLink] = [] + if pr_url: + links.append(ObservatoryLink(kind="github_pr", label="Engineering Loop PR", url=pr_url)) + workflow_url = _string_or_none(trace.get("workflow_run_url") or trace.get("github_run_url")) + if workflow_url and workflow_url != pr_url: + links.append(ObservatoryLink(kind="workflow_run", label="Workflow run", url=workflow_url)) + return links + + +def _node_summary(item: Mapping[str, Any]) -> str: + node = _string_or_none(item.get("node")) or "node" + output = item.get("output") + if isinstance(output, Mapping): + status = ( + output.get("status") or output.get("promotion_status") or output.get("signoff_status") + ) + if status: + return f"{node} status={status}" + if output.get("validation_errors"): + errors = output.get("validation_errors") + count = len(errors) if isinstance(errors, list) else 1 + return f"{node} validation_errors={count}" + return f"{node} completed" + + +def _bounded_node_payload(item: Mapping[str, Any]) -> dict[str, Any]: + return { + "node": item.get("node"), + "role": item.get("role"), + "timestamp": item.get("timestamp"), + "input_keys": item.get("input_keys", []), + "output": item.get("output", {}), + "state_before": item.get("state_before", {}), + } + + +def _nested(mapping: Mapping[str, Any], key: str, child: str) -> Any: + value = mapping.get(key) + return value.get(child) if isinstance(value, Mapping) else None + + +def _pr_number_from_url(url: str | None) -> int | None: + match = _GITHUB_PR_RE.search(url or "") + return int(match.group("number")) if match else None + + +def _int_or_none(value: Any) -> int | None: + if value is None or value == "": + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _string_or_none(value: Any) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None diff --git a/agent_core/adapters/knowledge.py b/agent_core/adapters/knowledge.py index 899580c..d15b87f 100644 --- a/agent_core/adapters/knowledge.py +++ b/agent_core/adapters/knowledge.py @@ -6,9 +6,11 @@ from __future__ import annotations import hashlib +from collections.abc import Mapping from typing import Any from agent_core.contracts.evidence import EvidencePacket +from agent_core.contracts.observatory import ObservatoryLink from agent_core.contracts.task import TaskEnvelope from agent_core.contracts.tracing import TraceEvent @@ -73,18 +75,88 @@ def trace_event_from_context_pack( pack: dict[str, Any], *, run_id: str | None = None ) -> TraceEvent: ref_count = len(pack.get("included_refs", []) or []) + first_source = _first_source_ref(pack) return TraceEvent.model_validate( { "event_type": "knowledge_context_pack", "node_id": "context_pack", + "agent_role": _string_or_none(pack.get("role")) or "knowledge_retriever", "summary": f"context pack {pack.get('id')} ({ref_count} refs)", "payload": { "id": pack.get("id"), + "task_id": pack.get("task_id"), + "role": pack.get("role"), "retrieval_version": pack.get("retrieval_version"), "policy_version": pack.get("policy_version"), "policy_decision": pack.get("policy_decision"), + "authority_max": evidence_from_context_pack(pack).authority_max, + "included_ref_count": ref_count, }, - "run_id": run_id, + "run_id": run_id + or _string_or_none(pack.get("run_id") or pack.get("knowledge_snapshot")), "graph_id": GRAPH_ID, + "trace_id": _string_or_none(pack.get("id")), + "case_id": _string_or_none(pack.get("case_id")), + "handoff_id": _string_or_none(pack.get("handoff_id")), + "objective_id": _string_or_none(pack.get("objective_id")), + "change_id": _string_or_none(pack.get("change_id") or pack.get("task_id")), + "repository": first_source[0], + "commit_sha": first_source[1], + "links": _links_for_context_pack(pack), } ) + + +def _links_for_context_pack(pack: Mapping[str, Any]) -> list[ObservatoryLink]: + links = [ + ObservatoryLink( + kind="knowledge", + label="Context pack", + ref_id=_string_or_none(pack.get("id")) or "", + metadata={"knowledge_snapshot": pack.get("knowledge_snapshot")}, + ) + ] + for ref in pack.get("included_refs", []) or []: + if not isinstance(ref, Mapping): + continue + raw_ref = _string_or_none(ref.get("ref") or ref.get("doc_id") or ref.get("doc_path")) + if raw_ref: + links.append( + ObservatoryLink( + kind="knowledge", + label=str(ref.get("authority") or ref.get("kind") or "knowledge ref"), + ref_id=raw_ref, + metadata={ + "authority": ref.get("authority"), + "kind": ref.get("kind"), + "review_status": ref.get("review_status"), + "commit_sha": ref.get("commit_sha"), + }, + ) + ) + return links + + +def _first_source_ref(pack: Mapping[str, Any]) -> tuple[str | None, str | None]: + fallback: tuple[str | None, str | None] = (None, None) + for ref in pack.get("included_refs", []) or []: + if not isinstance(ref, Mapping): + continue + raw_ref = _string_or_none(ref.get("ref") or ref.get("doc_id") or ref.get("doc_path")) + if not raw_ref: + continue + repository = raw_ref.split(":", 1)[0] if ":" in raw_ref else None + commit_sha = _string_or_none(ref.get("commit_sha")) + if repository or commit_sha: + if commit_sha: + return repository, commit_sha + if fallback == (None, None): + fallback = (repository, commit_sha) + return fallback + + +def _string_or_none(value: Any) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None diff --git a/pyproject.toml b/pyproject.toml index 8f3eb48..8eb037d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "agent-core" -version = "0.4.0" +version = "0.5.0" description = "Shared typed contracts for the AS215932 Agent Runtime Framework (Phase 1 / ยง31 safe milestone)." requires-python = ">=3.11" dependencies = ["pydantic>=2,<3"] diff --git a/tests/adapters/test_engineering_adapter.py b/tests/adapters/test_engineering_adapter.py index 88512ac..edd8481 100644 --- a/tests/adapters/test_engineering_adapter.py +++ b/tests/adapters/test_engineering_adapter.py @@ -26,11 +26,40 @@ def test_decision(load_fixture: Callable[[str], Any]) -> None: def test_trace_events(load_fixture: Callable[[str], Any]) -> None: trace = load_fixture("loop_trace.sample.json") + trace.update( + { + "change_id": "chg-2026-0628-001", + "pr_url": "https://github.com/AS215932/network-operations/pull/318", + "commit_sha": "abc123", + "workflow_run_id": "28392093138", + "trace_events": [ + { + "node": "hydrate_context", + "timestamp": "2026-06-29T00:00:00Z", + "output": {"status": "passed"}, + }, + { + "node": "gate_execution", + "timestamp": "2026-06-29T00:01:00Z", + "output": {"validation_errors": []}, + }, + ], + } + ) events = eng.trace_events_from_loop_trace(trace, run_id="run-1") types = [e.event_type for e in events] assert "model_call" in types assert "backend_execution" in types + assert types.count("loop_node") == 2 backend = next(e for e in events if e.event_type == "backend_execution") assert backend.cost is not None assert backend.cost.input_tokens == 1820 + assert backend.change_id == "chg-2026-0628-001" + assert backend.repository == "network-operations" + assert backend.pr_number == 318 + assert backend.commit_sha == "abc123" + assert backend.workflow_run_id == "28392093138" + loop_node = next(e for e in events if e.event_type == "loop_node") + assert loop_node.parent_event_id + assert loop_node.links[0].kind == "github_pr" assert all(e.run_id == "run-1" for e in events) diff --git a/tests/adapters/test_knowledge_adapter.py b/tests/adapters/test_knowledge_adapter.py index 28f1b81..30d94e9 100644 --- a/tests/adapters/test_knowledge_adapter.py +++ b/tests/adapters/test_knowledge_adapter.py @@ -16,9 +16,18 @@ def test_evidence_from_context_pack(load_fixture: Callable[[str], Any]) -> None: def test_trace_event_from_context_pack(load_fixture: Callable[[str], Any]) -> None: pack = load_fixture("context_pack.sample.json") + pack.update({"case_id": "case_1", "handoff_id": "handoff_1", "objective_id": "objective_1"}) event = kn.trace_event_from_context_pack(pack, run_id="run-2") assert event.event_type == "knowledge_context_pack" assert event.run_id == "run-2" + assert event.trace_id == pack["id"] + assert event.case_id == "case_1" + assert event.handoff_id == "handoff_1" + assert event.objective_id == "objective_1" + assert event.change_id == pack["task_id"] + assert event.repository == "network-operations" + assert event.commit_sha == "deadbeef" + assert event.links def test_task_envelope_deterministic() -> None: