Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 173 additions & 12 deletions backend/app/core/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import time
from collections.abc import Awaitable, Callable, Mapping
from contextlib import contextmanager
from typing import Any, TypeVar
from typing import TYPE_CHECKING, Any, TypeVar

from opentelemetry import metrics, trace
from opentelemetry.metrics import NoOpMeterProvider
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
Expand All @@ -29,7 +30,10 @@
from opentelemetry.metrics import Counter, Histogram, Meter
from opentelemetry.propagate import extract, inject, set_global_textmap
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import (
MetricReader,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
Expand All @@ -41,6 +45,9 @@
from app.core.config import Settings, get_settings
from app.core.logging import get_logger

if TYPE_CHECKING:
from app.worker.queue import QueueStats

logger = get_logger("telemetry")

F = TypeVar("F", bound=Callable[..., Any])
Expand All @@ -60,6 +67,15 @@
_adapter_runs: Counter | None = None
_adapter_errors: Counter | None = None
_adapter_duration: Histogram | None = None
_queue_stream_length: Any | None = None
_queue_lag: Any | None = None
_queue_pending: Any | None = None
_queue_backlog: Any | None = None
_queue_consumer_delay: Any | None = None
_queue_dlq_length: Any | None = None
_worker_utilization: Any | None = None
_worker_in_flight: Any | None = None
_worker_capacity: Any | None = None


def is_enabled() -> bool:
Expand All @@ -80,7 +96,11 @@ def _otlp_endpoint(settings: Settings) -> str:
return base


def setup_telemetry(*, settings: Settings | None = None) -> None:
def setup_telemetry(
*,
settings: Settings | None = None,
metric_readers: list[MetricReader] | None = None,
) -> None:
"""Idempotent SDK setup. No-op when ``otel_enabled`` is false."""
global _initialized, _tracer, _meter

Expand All @@ -94,18 +114,23 @@ def setup_telemetry(*, settings: Settings | None = None) -> None:
endpoint = _otlp_endpoint(settings)

tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"),
if metric_readers is None:
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"),
)
)
)
trace.set_tracer_provider(tracer_provider)
Comment on lines 116 to 123

metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{endpoint}/v1/metrics"),
export_interval_millis=settings.otel_metric_export_interval_ms,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
if metric_readers is None:
metric_reader: MetricReader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{endpoint}/v1/metrics"),
export_interval_millis=settings.otel_metric_export_interval_ms,
)
readers: list[MetricReader] = [metric_reader]
else:
readers = metric_readers
meter_provider = MeterProvider(resource=resource, metric_readers=readers)
metrics.set_meter_provider(meter_provider)

set_global_textmap(TraceContextTextMapPropagator())
Expand All @@ -123,6 +148,12 @@ def setup_telemetry(*, settings: Settings | None = None) -> None:
def shutdown_telemetry() -> None:
"""Flush exporters on process exit."""
global _initialized, _tracer, _meter
global _http_requests, _http_errors, _http_duration
global _worker_jobs, _worker_errors, _worker_duration
global _adapter_runs, _adapter_errors, _adapter_duration
global _queue_stream_length, _queue_lag, _queue_pending, _queue_backlog
global _queue_consumer_delay, _queue_dlq_length
global _worker_utilization, _worker_in_flight, _worker_capacity

if not _initialized:
return
Expand All @@ -135,9 +166,31 @@ def shutdown_telemetry() -> None:
if hasattr(meter_provider, "shutdown"):
meter_provider.shutdown() # type: ignore[union-attr]

# Reset globals so a later setup_telemetry() in tests or respawns can run.
trace.set_tracer_provider(TracerProvider())
metrics.set_meter_provider(NoOpMeterProvider())

_initialized = False
_tracer = None
_meter = None
_http_requests = None
_http_errors = None
_http_duration = None
_worker_jobs = None
_worker_errors = None
_worker_duration = None
_adapter_runs = None
_adapter_errors = None
_adapter_duration = None
_queue_stream_length = None
_queue_lag = None
_queue_pending = None
_queue_backlog = None
_queue_consumer_delay = None
_queue_dlq_length = None
_worker_utilization = None
_worker_in_flight = None
_worker_capacity = None


def get_tracer() -> Tracer:
Expand Down Expand Up @@ -202,6 +255,114 @@ def _ensure_red_instruments() -> None:
)


def _ensure_queue_instruments() -> None:
global _queue_stream_length, _queue_lag, _queue_pending, _queue_backlog
global _queue_consumer_delay, _queue_dlq_length

if _meter is None:
setup_telemetry()
meter = _meter or metrics.get_meter(get_settings().otel_service_name)

if _queue_stream_length is None:
_queue_stream_length = meter.create_gauge(
"agentflow.queue.stream_length",
description="Redis stream entry count (XLEN)",
unit="1",
)
Comment on lines +267 to +271
_queue_lag = meter.create_gauge(
"agentflow.queue.lag",
description="Undelivered entries not yet assigned to a consumer",
unit="1",
)
_queue_pending = meter.create_gauge(
"agentflow.queue.pending",
description="Entries delivered but not yet XACKed",
unit="1",
)
_queue_backlog = meter.create_gauge(
"agentflow.queue.backlog",
description="lag + pending (work waiting on workers)",
unit="1",
)
_queue_consumer_delay = meter.create_gauge(
"agentflow.queue.consumer_delay",
description="Worst-case wait among lag and pending entries",
unit="s",
)
_queue_dlq_length = meter.create_gauge(
"agentflow.queue.dlq_length",
description="Dead-letter stream length",
unit="1",
)


def _ensure_worker_util_instruments() -> None:
global _worker_utilization, _worker_in_flight, _worker_capacity

if _meter is None:
setup_telemetry()
meter = _meter or metrics.get_meter(get_settings().otel_service_name)

if _worker_utilization is None:
_worker_utilization = meter.create_gauge(
"agentflow.worker.utilization",
description=(
"Fraction of local concurrency slots busy (in_flight / capacity)"
),
unit="1",
)
Comment on lines +307 to +313
_worker_in_flight = meter.create_gauge(
"agentflow.worker.in_flight",
description="Run jobs currently executing in this worker process",
unit="1",
)
_worker_capacity = meter.create_gauge(
"agentflow.worker.capacity",
description="Maximum parallel jobs (AGENTFLOW_WORKER_CONCURRENCY)",
unit="1",
)


def record_queue_metrics(stats: "QueueStats") -> None:
"""Export a queue monitor sample as OTel gauges (``agentflow.queue.*``)."""
if not is_enabled():
return
_ensure_queue_instruments()
assert _queue_stream_length is not None
assert _queue_lag is not None
assert _queue_pending is not None
assert _queue_backlog is not None
assert _queue_consumer_delay is not None
assert _queue_dlq_length is not None

_queue_stream_length.set(stats.stream_length)
_queue_lag.set(stats.lag_count)
_queue_pending.set(stats.pending_count)
_queue_backlog.set(stats.backlog_count)
delay = stats.consumer_delay_seconds
if delay is not None:
_queue_consumer_delay.set(delay)
if stats.dlq_length is not None:
_queue_dlq_length.set(stats.dlq_length)


def set_worker_utilization(*, in_flight: int, capacity: int) -> None:
"""Update per-process worker slot usage (``agentflow.worker.*`` gauges)."""
if not is_enabled():
return
_ensure_worker_util_instruments()
assert _worker_utilization is not None
assert _worker_in_flight is not None
assert _worker_capacity is not None

capped = max(0, in_flight)
slots = max(1, capacity)
ratio = min(capped, slots) / slots
_worker_in_flight.set(capped)
_worker_capacity.set(capacity)
_worker_utilization.set(ratio)


def capture_trace_context() -> dict[str, str] | None:
"""Serialize the active span context for cross-process propagation."""
if not is_enabled():
Expand Down
3 changes: 3 additions & 0 deletions backend/app/worker/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The monitor runs as a background asyncio task alongside the job consume loop.
It periodically samples the run-job stream and emits:

* ``agentflow.queue.*`` OTel gauges when ``AGENTFLOW_OTEL_ENABLED=true``.
* ``queue.metrics`` at INFO — baseline depth / lag / pending counters.
* ``queue.consumer_delay_alert`` at WARNING — oldest undelivered or
un-ACKed entry exceeds ``Settings.job_queue_consumer_delay_alert_seconds``.
Expand All @@ -20,6 +21,7 @@

from app.core.config import Settings, get_settings
from app.core.logging import get_logger
from app.core.telemetry import record_queue_metrics
from app.worker.queue import JobQueue, QueueStats, RedisStreamsJobQueue

logger = get_logger("worker.monitor")
Expand Down Expand Up @@ -51,6 +53,7 @@ async def run_queue_monitor(
try:
stats = await collect_queue_stats(queue)
if stats is not None:
record_queue_metrics(stats)
logger.info(
"queue.metrics",
stream_length=stats.stream_length,
Expand Down
9 changes: 5 additions & 4 deletions backend/app/worker/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,15 @@ async def collect_stats(self) -> QueueStats:
pending_count = int(group_info.get("pending") or 0) if group_info else 0

if pending_count == 0:
trailing = await self._redis.xrange(
undelivered = await self._redis.xrange(
self._stream,
min=f"({last_delivered_id}",
max="+",
count=1,
)
if trailing:
oldest_lag_seconds = _entry_age_seconds(trailing[0][0])
if undelivered:
if lag_count == 0:
lag_count = len(undelivered)
oldest_lag_seconds = _entry_age_seconds(undelivered[0][0])
Comment on lines 392 to +401
elif lag_count > 0:
trailing = await self._redis.xrange(
self._stream,
Expand Down
20 changes: 18 additions & 2 deletions backend/app/worker/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from app.adapters import EchoAdapter, LangGraphAdapter # noqa: F401 - register
from app.core.config import get_settings
from app.core.logging import get_logger, setup_logging
from app.core.telemetry import setup_telemetry, shutdown_telemetry, trace_worker_job
from app.core.telemetry import (
set_worker_utilization,
setup_telemetry,
shutdown_telemetry,
trace_worker_job,
)
from app.db.base import Base
from app.db.session import engine
from app.events import get_event_bus
Expand Down Expand Up @@ -93,6 +98,11 @@ async def _consume_loop(
slots = asyncio.Semaphore(concurrency)
in_flight: set[asyncio.Task[None]] = set()

def _report_utilization() -> None:
set_worker_utilization(in_flight=len(in_flight), capacity=concurrency)

_report_utilization()

async def _run_job(lease: JobLease) -> None:
try:
await _process_lease(executor, queue, lease)
Expand Down Expand Up @@ -122,7 +132,13 @@ async def _run_job(lease: JobLease) -> None:

task = asyncio.create_task(_run_job(lease))
in_flight.add(task)
task.add_done_callback(in_flight.discard)

def _on_job_done(done: asyncio.Task[None]) -> None:
in_flight.discard(done)
_report_utilization()

task.add_done_callback(_on_job_done)
_report_utilization()
finally:
aclose = getattr(consumer, "aclose", None)
if aclose is not None:
Expand Down
26 changes: 26 additions & 0 deletions backend/tests/test_queue_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import fakeredis.aioredis as fakeredis
import pytest

from unittest.mock import patch

from app.core.config import Settings
from app.worker.monitor import (
_emit_delay_alerts,
Expand Down Expand Up @@ -130,6 +132,30 @@ def test_depth_alert_edge_triggering():
assert active is False


@pytest.mark.asyncio
async def test_run_queue_monitor_exports_otel_metrics():
redis = fakeredis.FakeRedis(decode_responses=True)
queue = _make_queue(redis)
await queue.enqueue(RunJob.new(run_id="r1", agent_id="a1", adapter="echo"))

stop = asyncio.Event()
settings = Settings(
job_queue_monitor_enabled=True,
job_queue_monitor_interval_seconds=5,
)
with patch("app.worker.monitor.record_queue_metrics") as record:
monitor_task = asyncio.create_task(
run_queue_monitor(queue, stop, settings=settings)
)
await asyncio.sleep(0.05)
stop.set()
await asyncio.wait_for(monitor_task, timeout=1.0)

assert record.call_count >= 1
stats = record.call_args[0][0]
assert stats.stream_length >= 1


@pytest.mark.asyncio
async def test_run_queue_monitor_stops_when_event_is_set():
redis = fakeredis.FakeRedis(decode_responses=True)
Expand Down
Loading
Loading