Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions agent_core/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Trace delivery sinks (JSONL file + HTTP collector). stdlib-only."""

from __future__ import annotations

from agent_core.tracing.sink import (
HttpSink,
JsonlFileSink,
MultiSink,
NullSink,
TraceSink,
sink_from_env,
)

__all__ = [
"TraceSink",
"NullSink",
"JsonlFileSink",
"HttpSink",
"MultiSink",
"sink_from_env",
]
108 changes: 108 additions & 0 deletions agent_core/tracing/sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Trace sinks: deliver TraceEvents to a JSONL file and/or an HTTP collector.

stdlib-only (``urllib``) so importing this stays dependency-light. Every sink is
best-effort: ``emit`` never raises and returns True on success, False otherwise.
"""

from __future__ import annotations

import os
import urllib.request
from pathlib import Path
from typing import Protocol, runtime_checkable

from agent_core.contracts.tracing import TraceEvent

_TRUTHY = {"1", "true", "yes", "on"}


@runtime_checkable
class TraceSink(Protocol):
def emit(self, event: TraceEvent) -> bool:
"""Deliver one event; return True on success. Must never raise."""
...


class NullSink:
"""Drops events (used when emission is disabled)."""

def emit(self, event: TraceEvent) -> bool:
return False


class JsonlFileSink:
"""Appends one JSON line per event to a file."""

def __init__(self, path: str | Path) -> None:
self.path = Path(path)

def emit(self, event: TraceEvent) -> bool:
try:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("a", encoding="utf-8") as handle:
handle.write(event.model_dump_json() + "\n")
return True
except Exception:
return False


class HttpSink:
"""POSTs each event as JSON to a collector URL (best-effort)."""

def __init__(self, url: str, *, timeout: float = 2.0, token: str | None = None) -> None:
self.url = url
self.timeout = timeout
self.token = token

def emit(self, event: TraceEvent) -> bool:
try:
payload = event.model_dump_json().encode("utf-8")
headers = {"content-type": "application/json"}
if self.token:
headers["authorization"] = f"Bearer {self.token}"
request = urllib.request.Request(
self.url, data=payload, headers=headers, method="POST"
)
with urllib.request.urlopen(request, timeout=self.timeout) as response:
code = response.getcode()
return bool(code is not None and 200 <= code < 300)
except Exception:
return False


class MultiSink:
"""Fans an event out to several sinks; True if any delivery succeeds."""

def __init__(self, sinks: list[TraceSink]) -> None:
self.sinks = sinks

def emit(self, event: TraceEvent) -> bool:
delivered = False
for sink in self.sinks:
if sink.emit(event):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Catch per-sink failures in MultiSink

When MultiSink is used with a custom sink (or a built-in sink with an unexpected bug), an exception from sink.emit(event) propagates and prevents later sinks from receiving the event. That breaks the documented best-effort contract that emit() never raises; for example, MultiSink([raising_sink, JsonlFileSink(...)]) would lose the file delivery instead of returning whether any sink succeeded. Wrap each per-sink emit in try/except and continue fanout.

Useful? React with 👍 / 👎.

delivered = True
return delivered


def sink_from_env(flag_env: str) -> TraceSink:
"""Build a sink from env vars keyed off ``flag_env``.

``{flag_env}`` truthy enables emission. ``{flag_env}_COLLECTOR_URL`` adds an HTTP sink
(optional ``{flag_env}_COLLECTOR_TOKEN``); ``{flag_env}_PATH`` adds a JSONL file sink.
Both configured -> MultiSink. Disabled or neither configured -> NullSink.
"""
if os.environ.get(flag_env, "").strip().lower() not in _TRUTHY:
return NullSink()
sinks: list[TraceSink] = []
url = os.environ.get(f"{flag_env}_COLLECTOR_URL", "").strip()
if url:
token = os.environ.get(f"{flag_env}_COLLECTOR_TOKEN") or None
sinks.append(HttpSink(url, token=token))
path = os.environ.get(f"{flag_env}_PATH", "").strip()
if path:
sinks.append(JsonlFileSink(path))
if not sinks:
return NullSink()
if len(sinks) == 1:
return sinks[0]
return MultiSink(sinks)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "agent-core"
version = "0.1.0"
version = "0.2.0"
description = "Shared typed contracts for the AS215932 Agent Runtime Framework (Phase 1 / §31 safe milestone)."
requires-python = ">=3.11"
dependencies = ["pydantic>=2,<3"]
Expand Down
81 changes: 81 additions & 0 deletions tests/tracing/test_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

import json

from agent_core.contracts.tracing import TraceEvent
from agent_core.tracing import (
HttpSink,
JsonlFileSink,
MultiSink,
NullSink,
sink_from_env,
)


def _event() -> TraceEvent:
return TraceEvent(event_type="model_call", summary="x", run_id="r1")


def test_null_sink() -> None:
assert NullSink().emit(_event()) is False


def test_jsonl_file_sink(tmp_path) -> None:
sink = JsonlFileSink(tmp_path / "t.jsonl")
assert sink.emit(_event()) is True
lines = (tmp_path / "t.jsonl").read_text(encoding="utf-8").strip().splitlines()
assert len(lines) == 1
assert json.loads(lines[0])["event_type"] == "model_call"


def test_http_sink_success(monkeypatch) -> None:
captured: dict[str, object] = {}

class _Resp:
def __enter__(self):
return self

def __exit__(self, *exc):
return False

def getcode(self):
return 204

def _fake_urlopen(request, timeout=None):
captured["url"] = request.full_url
captured["data"] = request.data
return _Resp()

monkeypatch.setattr("urllib.request.urlopen", _fake_urlopen)
assert HttpSink("http://collector/v1/trace").emit(_event()) is True
assert captured["url"] == "http://collector/v1/trace"
assert json.loads(captured["data"])["event_type"] == "model_call"


def test_http_sink_failure(monkeypatch) -> None:
def _boom(request, timeout=None):
raise OSError("collector down")

monkeypatch.setattr("urllib.request.urlopen", _boom)
assert HttpSink("http://collector").emit(_event()) is False


def test_multi_sink(tmp_path) -> None:
file_sink = JsonlFileSink(tmp_path / "m.jsonl")
multi = MultiSink([NullSink(), file_sink])
assert multi.emit(_event()) is True
assert (tmp_path / "m.jsonl").exists()


def test_sink_from_env_disabled(monkeypatch) -> None:
monkeypatch.delenv("HYRULE_X_TRACE", raising=False)
assert isinstance(sink_from_env("HYRULE_X_TRACE"), NullSink)


def test_sink_from_env_file_and_http(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("HYRULE_X_TRACE", "1")
monkeypatch.setenv("HYRULE_X_TRACE_PATH", str(tmp_path / "e.jsonl"))
monkeypatch.setenv("HYRULE_X_TRACE_COLLECTOR_URL", "http://collector/v1/trace")
sink = sink_from_env("HYRULE_X_TRACE")
assert isinstance(sink, MultiSink)
assert len(sink.sinks) == 2
Loading