Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

from __future__ import annotations

__version__ = "0.4.0"
__version__ = "0.5.0"
224 changes: 186 additions & 38 deletions agent_core/adapters/engineering_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

from __future__ import annotations

import re
from collections.abc import Mapping
from typing import Any

from agent_core.contracts.decision import DecisionPacket
from agent_core.contracts.models import CostUsage
from agent_core.contracts.observatory import ObservatoryLink
from agent_core.contracts.task import TaskEnvelope
from agent_core.contracts.tools import ToolResult
from agent_core.contracts.tracing import TraceEvent

GRAPH_ID = "engineering-loop"
_GITHUB_PR_RE = re.compile(r"github\.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)/pull/(?P<number>\d+)")


def task_envelope_from_graph_state(state: dict[str, Any]) -> TaskEnvelope:
Expand Down Expand Up @@ -81,49 +85,193 @@ def tool_result_from_gate(gate: dict[str, Any]) -> ToolResult:
def trace_events_from_loop_trace(
trace: dict[str, Any], *, run_id: str | None = None
) -> list[TraceEvent]:
"""Convert Engineering Loop trace/state dictionaries to observatory TraceEvents.

Besides legacy LLM/gate/backend summaries, this emits one ``loop_node`` event for
each compact graph-node trace item when ``trace_events`` (live state) or ``events``
(``loop_trace.json``) is present. Events carry change/repository/PR correlation so
the observatory can stitch graph actions, GitHub artifacts, and deploy evidence
into a single safe traceback.
"""
events: list[TraceEvent] = []
base_fields = _base_trace_fields(trace, run_id=run_id)
previous_event_id: str | None = None

for item in trace.get("llm_outputs", []) or []:
events.append(
TraceEvent.model_validate(
{
"event_type": "model_call",
"node_id": item.get("role"),
"agent_role": item.get("role"),
"summary": f"role review approved={item.get('approved')}",
"payload": item,
"run_id": run_id,
"graph_id": GRAPH_ID,
}
)
event = TraceEvent.model_validate(
{
**base_fields,
"event_type": "model_call",
"node_id": item.get("role"),
"agent_role": item.get("role"),
"summary": f"role review approved={item.get('approved')}",
"payload": item,
}
)
events.append(event)
previous_event_id = event.event_id
for gate in trace.get("gate_results", []) or []:
events.append(
TraceEvent.model_validate(
{
"event_type": "tool_call",
"node_id": "gate_execution",
"summary": str(gate.get("command")),
"payload": gate,
"run_id": run_id,
"graph_id": GRAPH_ID,
}
)
event = TraceEvent.model_validate(
{
**base_fields,
"event_type": "tool_call",
"node_id": "gate_execution",
"summary": str(gate.get("command")),
"payload": gate,
"parent_event_id": previous_event_id,
}
)
events.append(event)
previous_event_id = event.event_id
for backend_result in trace.get("backend_results", []) or []:
events.append(
TraceEvent.model_validate(
{
"event_type": "backend_execution",
"node_id": "delegate_implementation",
"summary": (
f"backend={backend_result.get('backend')} "
f"status={backend_result.get('status')}"
),
"payload": backend_result,
"cost": cost_usage_from_backend_result(backend_result).model_dump(),
"run_id": run_id,
"graph_id": GRAPH_ID,
}
)
event = TraceEvent.model_validate(
{
**base_fields,
"repository": _repo_from_backend(backend_result) or base_fields.get("repository"),
"event_type": "backend_execution",
"node_id": "delegate_implementation",
"summary": (
f"backend={backend_result.get('backend')} "
f"status={backend_result.get('status')}"
),
"payload": backend_result,
"cost": cost_usage_from_backend_result(backend_result).model_dump(),
"parent_event_id": previous_event_id,
}
)
events.append(event)
previous_event_id = event.event_id

node_trace = trace.get("trace_events") or trace.get("events") or []
for item in node_trace:
if not isinstance(item, Mapping):
continue
event = TraceEvent.model_validate(
{
**base_fields,
"event_type": "loop_node",
"node_id": _string_or_none(item.get("node")),
"agent_role": _string_or_none(item.get("role")),
"summary": _node_summary(item),
"payload": _bounded_node_payload(item),
"parent_event_id": previous_event_id,
}
)
events.append(event)
previous_event_id = event.event_id

return events


def _base_trace_fields(trace: Mapping[str, Any], *, run_id: str | None) -> dict[str, Any]:
raw_change = trace.get("change")
change: Mapping[str, Any] = raw_change if isinstance(raw_change, Mapping) else {}
change_id = _string_or_none(trace.get("change_id") or change.get("change_id") or run_id)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Don't synthesize change_id from run_id

When a legacy loop_trace.json has no change/change_id but the caller passes run_id, this fallback stores the run ID in change_id. The collector surfaces change_ids in run summaries and filters /v1/actions by exact change_id (agent_core/collector/app.py:210-212 and agent_core/collector/app.py:249), so runs with no actual change are advertised as if they changed run-... and can match change filters incorrectly. Leave change_id null unless the trace provides a real change identifier.

Useful? React with 👍 / 👎.

pr_url = _string_or_none(
trace.get("pr_url") or trace.get("github_pr_url") or _nested(trace, "github", "url")
)
fields: dict[str, Any] = {
"run_id": run_id,
"graph_id": GRAPH_ID,
"change_id": change_id,
"repository": _repository_from_trace(trace, pr_url=pr_url),
"pr_number": _int_or_none(
trace.get("pr_number")
or _nested(trace, "github", "number")
or _pr_number_from_url(pr_url)
),
"commit_sha": _string_or_none(
trace.get("commit_sha")
or trace.get("remote_commit")
or trace.get("commit")
or _nested(trace, "github", "commit")
),
"workflow_run_id": _string_or_none(
trace.get("workflow_run_id") or trace.get("github_run_id")
),
"links": _links(trace, pr_url=pr_url),
}
return fields


def _repository_from_trace(trace: Mapping[str, Any], *, pr_url: str | None) -> str | None:
direct = _string_or_none(trace.get("repository") or trace.get("feature_target_repo"))
if direct:
return direct
match = _GITHUB_PR_RE.search(pr_url or "")
if match:
return f"{match.group('owner')}/{match.group('repo')}"
names = trace.get("promotion_repo_names")
if isinstance(names, list) and len(names) == 1:
return _string_or_none(names[0])
repos = trace.get("promotion_repositories")
if isinstance(repos, Mapping) and len(repos) == 1:
return _string_or_none(next(iter(repos)))
return None


def _repo_from_backend(backend_result: Mapping[str, Any]) -> str | None:
return _string_or_none(backend_result.get("repository") or backend_result.get("repo"))


def _links(trace: Mapping[str, Any], *, pr_url: str | None) -> list[ObservatoryLink]:
links: list[ObservatoryLink] = []
if pr_url:
links.append(ObservatoryLink(kind="github_pr", label="Engineering Loop PR", url=pr_url))
workflow_url = _string_or_none(trace.get("workflow_run_url") or trace.get("github_run_url"))
if workflow_url and workflow_url != pr_url:
links.append(ObservatoryLink(kind="workflow_run", label="Workflow run", url=workflow_url))
return links


def _node_summary(item: Mapping[str, Any]) -> str:
node = _string_or_none(item.get("node")) or "node"
output = item.get("output")
if isinstance(output, Mapping):
status = (
output.get("status") or output.get("promotion_status") or output.get("signoff_status")
)
if status:
return f"{node} status={status}"
if output.get("validation_errors"):
errors = output.get("validation_errors")
count = len(errors) if isinstance(errors, list) else 1
return f"{node} validation_errors={count}"
return f"{node} completed"


def _bounded_node_payload(item: Mapping[str, Any]) -> dict[str, Any]:
return {
"node": item.get("node"),
"role": item.get("role"),
"timestamp": item.get("timestamp"),
"input_keys": item.get("input_keys", []),
"output": item.get("output", {}),
"state_before": item.get("state_before", {}),
}


def _nested(mapping: Mapping[str, Any], key: str, child: str) -> Any:
value = mapping.get(key)
return value.get(child) if isinstance(value, Mapping) else None


def _pr_number_from_url(url: str | None) -> int | None:
match = _GITHUB_PR_RE.search(url or "")
return int(match.group("number")) if match else None


def _int_or_none(value: Any) -> int | None:
if value is None or value == "":
return None
try:
return int(value)
except (TypeError, ValueError):
return None


def _string_or_none(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
74 changes: 73 additions & 1 deletion agent_core/adapters/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from __future__ import annotations

import hashlib
from collections.abc import Mapping
from typing import Any

from agent_core.contracts.evidence import EvidencePacket
from agent_core.contracts.observatory import ObservatoryLink
from agent_core.contracts.task import TaskEnvelope
from agent_core.contracts.tracing import TraceEvent

Expand Down Expand Up @@ -73,18 +75,88 @@ def trace_event_from_context_pack(
pack: dict[str, Any], *, run_id: str | None = None
) -> TraceEvent:
ref_count = len(pack.get("included_refs", []) or [])
first_source = _first_source_ref(pack)
return TraceEvent.model_validate(
{
"event_type": "knowledge_context_pack",
"node_id": "context_pack",
"agent_role": _string_or_none(pack.get("role")) or "knowledge_retriever",
"summary": f"context pack {pack.get('id')} ({ref_count} refs)",
"payload": {
"id": pack.get("id"),
"task_id": pack.get("task_id"),
"role": pack.get("role"),
"retrieval_version": pack.get("retrieval_version"),
"policy_version": pack.get("policy_version"),
"policy_decision": pack.get("policy_decision"),
"authority_max": evidence_from_context_pack(pack).authority_max,
"included_ref_count": ref_count,
},
"run_id": run_id,
"run_id": run_id
or _string_or_none(pack.get("run_id") or pack.get("knowledge_snapshot")),
Comment on lines +95 to +96

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Don't group unrelated packs by snapshot ID

When a context pack is emitted without an explicit run_id, this uses knowledge_snapshot as the run ID. The collector groups /v1/runs solely by run_id (agent_core/collector/app.py:342-347), so every pack produced from the same export snapshot is collapsed into one run even when they are different retrieval tasks. Leave run_id unset or use a per-pack/per-execution identifier instead of the snapshot name.

Useful? React with 👍 / 👎.

"graph_id": GRAPH_ID,
"trace_id": _string_or_none(pack.get("id")),
"case_id": _string_or_none(pack.get("case_id")),
"handoff_id": _string_or_none(pack.get("handoff_id")),
"objective_id": _string_or_none(pack.get("objective_id")),
"change_id": _string_or_none(pack.get("change_id") or pack.get("task_id")),
"repository": first_source[0],
"commit_sha": first_source[1],
"links": _links_for_context_pack(pack),
}
)


def _links_for_context_pack(pack: Mapping[str, Any]) -> list[ObservatoryLink]:
links = [
ObservatoryLink(
kind="knowledge",
label="Context pack",
ref_id=_string_or_none(pack.get("id")) or "",
metadata={"knowledge_snapshot": pack.get("knowledge_snapshot")},
)
]
for ref in pack.get("included_refs", []) or []:
if not isinstance(ref, Mapping):
continue
raw_ref = _string_or_none(ref.get("ref") or ref.get("doc_id") or ref.get("doc_path"))
if raw_ref:
links.append(
ObservatoryLink(
kind="knowledge",
label=str(ref.get("authority") or ref.get("kind") or "knowledge ref"),
ref_id=raw_ref,
metadata={
"authority": ref.get("authority"),
"kind": ref.get("kind"),
"review_status": ref.get("review_status"),
"commit_sha": ref.get("commit_sha"),
},
)
)
return links


def _first_source_ref(pack: Mapping[str, Any]) -> tuple[str | None, str | None]:
fallback: tuple[str | None, str | None] = (None, None)
for ref in pack.get("included_refs", []) or []:
if not isinstance(ref, Mapping):
continue
raw_ref = _string_or_none(ref.get("ref") or ref.get("doc_id") or ref.get("doc_path"))
if not raw_ref:
continue
repository = raw_ref.split(":", 1)[0] if ":" in raw_ref else None
commit_sha = _string_or_none(ref.get("commit_sha"))
if repository or commit_sha:
if commit_sha:
return repository, commit_sha
if fallback == (None, None):
fallback = (repository, commit_sha)
return fallback


def _string_or_none(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "agent-core"
version = "0.4.0"
version = "0.5.0"
description = "Shared typed contracts for the AS215932 Agent Runtime Framework (Phase 1 / §31 safe milestone)."
requires-python = ">=3.11"
dependencies = ["pydantic>=2,<3"]
Expand Down
Loading
Loading