diff --git a/agent_core/tracing/__init__.py b/agent_core/tracing/__init__.py new file mode 100644 index 0000000..f3e27ef --- /dev/null +++ b/agent_core/tracing/__init__.py @@ -0,0 +1,21 @@ +"""Trace delivery sinks (JSONL file + HTTP collector). stdlib-only.""" + +from __future__ import annotations + +from agent_core.tracing.sink import ( + HttpSink, + JsonlFileSink, + MultiSink, + NullSink, + TraceSink, + sink_from_env, +) + +__all__ = [ + "TraceSink", + "NullSink", + "JsonlFileSink", + "HttpSink", + "MultiSink", + "sink_from_env", +] diff --git a/agent_core/tracing/sink.py b/agent_core/tracing/sink.py new file mode 100644 index 0000000..15772fa --- /dev/null +++ b/agent_core/tracing/sink.py @@ -0,0 +1,108 @@ +"""Trace sinks: deliver TraceEvents to a JSONL file and/or an HTTP collector. + +stdlib-only (``urllib``) so importing this stays dependency-light. Every sink is +best-effort: ``emit`` never raises and returns True on success, False otherwise. +""" + +from __future__ import annotations + +import os +import urllib.request +from pathlib import Path +from typing import Protocol, runtime_checkable + +from agent_core.contracts.tracing import TraceEvent + +_TRUTHY = {"1", "true", "yes", "on"} + + +@runtime_checkable +class TraceSink(Protocol): + def emit(self, event: TraceEvent) -> bool: + """Deliver one event; return True on success. Must never raise.""" + ... + + +class NullSink: + """Drops events (used when emission is disabled).""" + + def emit(self, event: TraceEvent) -> bool: + return False + + +class JsonlFileSink: + """Appends one JSON line per event to a file.""" + + def __init__(self, path: str | Path) -> None: + self.path = Path(path) + + def emit(self, event: TraceEvent) -> bool: + try: + self.path.parent.mkdir(parents=True, exist_ok=True) + with self.path.open("a", encoding="utf-8") as handle: + handle.write(event.model_dump_json() + "\n") + return True + except Exception: + return False + + +class HttpSink: + """POSTs each event as JSON to a collector URL (best-effort).""" + + def __init__(self, url: str, *, timeout: float = 2.0, token: str | None = None) -> None: + self.url = url + self.timeout = timeout + self.token = token + + def emit(self, event: TraceEvent) -> bool: + try: + payload = event.model_dump_json().encode("utf-8") + headers = {"content-type": "application/json"} + if self.token: + headers["authorization"] = f"Bearer {self.token}" + request = urllib.request.Request( + self.url, data=payload, headers=headers, method="POST" + ) + with urllib.request.urlopen(request, timeout=self.timeout) as response: + code = response.getcode() + return bool(code is not None and 200 <= code < 300) + except Exception: + return False + + +class MultiSink: + """Fans an event out to several sinks; True if any delivery succeeds.""" + + def __init__(self, sinks: list[TraceSink]) -> None: + self.sinks = sinks + + def emit(self, event: TraceEvent) -> bool: + delivered = False + for sink in self.sinks: + if sink.emit(event): + delivered = True + return delivered + + +def sink_from_env(flag_env: str) -> TraceSink: + """Build a sink from env vars keyed off ``flag_env``. + + ``{flag_env}`` truthy enables emission. ``{flag_env}_COLLECTOR_URL`` adds an HTTP sink + (optional ``{flag_env}_COLLECTOR_TOKEN``); ``{flag_env}_PATH`` adds a JSONL file sink. + Both configured -> MultiSink. Disabled or neither configured -> NullSink. + """ + if os.environ.get(flag_env, "").strip().lower() not in _TRUTHY: + return NullSink() + sinks: list[TraceSink] = [] + url = os.environ.get(f"{flag_env}_COLLECTOR_URL", "").strip() + if url: + token = os.environ.get(f"{flag_env}_COLLECTOR_TOKEN") or None + sinks.append(HttpSink(url, token=token)) + path = os.environ.get(f"{flag_env}_PATH", "").strip() + if path: + sinks.append(JsonlFileSink(path)) + if not sinks: + return NullSink() + if len(sinks) == 1: + return sinks[0] + return MultiSink(sinks) diff --git a/pyproject.toml b/pyproject.toml index fda8c27..da05b2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "agent-core" -version = "0.1.0" +version = "0.2.0" description = "Shared typed contracts for the AS215932 Agent Runtime Framework (Phase 1 / ยง31 safe milestone)." requires-python = ">=3.11" dependencies = ["pydantic>=2,<3"] diff --git a/tests/tracing/test_sink.py b/tests/tracing/test_sink.py new file mode 100644 index 0000000..5770509 --- /dev/null +++ b/tests/tracing/test_sink.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import json + +from agent_core.contracts.tracing import TraceEvent +from agent_core.tracing import ( + HttpSink, + JsonlFileSink, + MultiSink, + NullSink, + sink_from_env, +) + + +def _event() -> TraceEvent: + return TraceEvent(event_type="model_call", summary="x", run_id="r1") + + +def test_null_sink() -> None: + assert NullSink().emit(_event()) is False + + +def test_jsonl_file_sink(tmp_path) -> None: + sink = JsonlFileSink(tmp_path / "t.jsonl") + assert sink.emit(_event()) is True + lines = (tmp_path / "t.jsonl").read_text(encoding="utf-8").strip().splitlines() + assert len(lines) == 1 + assert json.loads(lines[0])["event_type"] == "model_call" + + +def test_http_sink_success(monkeypatch) -> None: + captured: dict[str, object] = {} + + class _Resp: + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def getcode(self): + return 204 + + def _fake_urlopen(request, timeout=None): + captured["url"] = request.full_url + captured["data"] = request.data + return _Resp() + + monkeypatch.setattr("urllib.request.urlopen", _fake_urlopen) + assert HttpSink("http://collector/v1/trace").emit(_event()) is True + assert captured["url"] == "http://collector/v1/trace" + assert json.loads(captured["data"])["event_type"] == "model_call" + + +def test_http_sink_failure(monkeypatch) -> None: + def _boom(request, timeout=None): + raise OSError("collector down") + + monkeypatch.setattr("urllib.request.urlopen", _boom) + assert HttpSink("http://collector").emit(_event()) is False + + +def test_multi_sink(tmp_path) -> None: + file_sink = JsonlFileSink(tmp_path / "m.jsonl") + multi = MultiSink([NullSink(), file_sink]) + assert multi.emit(_event()) is True + assert (tmp_path / "m.jsonl").exists() + + +def test_sink_from_env_disabled(monkeypatch) -> None: + monkeypatch.delenv("HYRULE_X_TRACE", raising=False) + assert isinstance(sink_from_env("HYRULE_X_TRACE"), NullSink) + + +def test_sink_from_env_file_and_http(monkeypatch, tmp_path) -> None: + monkeypatch.setenv("HYRULE_X_TRACE", "1") + monkeypatch.setenv("HYRULE_X_TRACE_PATH", str(tmp_path / "e.jsonl")) + monkeypatch.setenv("HYRULE_X_TRACE_COLLECTOR_URL", "http://collector/v1/trace") + sink = sink_from_env("HYRULE_X_TRACE") + assert isinstance(sink, MultiSink) + assert len(sink.sinks) == 2