diff --git a/backend/app/core/telemetry.py b/backend/app/core/telemetry.py index b32d1e6..52d4f50 100644 --- a/backend/app/core/telemetry.py +++ b/backend/app/core/telemetry.py @@ -16,9 +16,10 @@ import time from collections.abc import Awaitable, Callable, Mapping from contextlib import contextmanager -from typing import Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar from opentelemetry import metrics, trace +from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter, ) @@ -29,7 +30,10 @@ from opentelemetry.metrics import Counter, Histogram, Meter from opentelemetry.propagate import extract, inject, set_global_textmap from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import ( + MetricReader, + PeriodicExportingMetricReader, +) from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -41,6 +45,9 @@ from app.core.config import Settings, get_settings from app.core.logging import get_logger +if TYPE_CHECKING: + from app.worker.queue import QueueStats + logger = get_logger("telemetry") F = TypeVar("F", bound=Callable[..., Any]) @@ -60,6 +67,15 @@ _adapter_runs: Counter | None = None _adapter_errors: Counter | None = None _adapter_duration: Histogram | None = None +_queue_stream_length: Any | None = None +_queue_lag: Any | None = None +_queue_pending: Any | None = None +_queue_backlog: Any | None = None +_queue_consumer_delay: Any | None = None +_queue_dlq_length: Any | None = None +_worker_utilization: Any | None = None +_worker_in_flight: Any | None = None +_worker_capacity: Any | None = None def is_enabled() -> bool: @@ -80,7 +96,11 @@ def _otlp_endpoint(settings: Settings) -> str: return base -def setup_telemetry(*, settings: Settings | None = None) -> None: +def setup_telemetry( + *, + settings: Settings | None = None, + metric_readers: list[MetricReader] | None = None, +) -> None: """Idempotent SDK setup. No-op when ``otel_enabled`` is false.""" global _initialized, _tracer, _meter @@ -94,18 +114,23 @@ def setup_telemetry(*, settings: Settings | None = None) -> None: endpoint = _otlp_endpoint(settings) tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor( - BatchSpanProcessor( - OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"), + if metric_readers is None: + tracer_provider.add_span_processor( + BatchSpanProcessor( + OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"), + ) ) - ) trace.set_tracer_provider(tracer_provider) - metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint=f"{endpoint}/v1/metrics"), - export_interval_millis=settings.otel_metric_export_interval_ms, - ) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + if metric_readers is None: + metric_reader: MetricReader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=f"{endpoint}/v1/metrics"), + export_interval_millis=settings.otel_metric_export_interval_ms, + ) + readers: list[MetricReader] = [metric_reader] + else: + readers = metric_readers + meter_provider = MeterProvider(resource=resource, metric_readers=readers) metrics.set_meter_provider(meter_provider) set_global_textmap(TraceContextTextMapPropagator()) @@ -123,6 +148,12 @@ def setup_telemetry(*, settings: Settings | None = None) -> None: def shutdown_telemetry() -> None: """Flush exporters on process exit.""" global _initialized, _tracer, _meter + global _http_requests, _http_errors, _http_duration + global _worker_jobs, _worker_errors, _worker_duration + global _adapter_runs, _adapter_errors, _adapter_duration + global _queue_stream_length, _queue_lag, _queue_pending, _queue_backlog + global _queue_consumer_delay, _queue_dlq_length + global _worker_utilization, _worker_in_flight, _worker_capacity if not _initialized: return @@ -135,9 +166,31 @@ def shutdown_telemetry() -> None: if hasattr(meter_provider, "shutdown"): meter_provider.shutdown() # type: ignore[union-attr] + # Reset globals so a later setup_telemetry() in tests or respawns can run. + trace.set_tracer_provider(TracerProvider()) + metrics.set_meter_provider(NoOpMeterProvider()) + _initialized = False _tracer = None _meter = None + _http_requests = None + _http_errors = None + _http_duration = None + _worker_jobs = None + _worker_errors = None + _worker_duration = None + _adapter_runs = None + _adapter_errors = None + _adapter_duration = None + _queue_stream_length = None + _queue_lag = None + _queue_pending = None + _queue_backlog = None + _queue_consumer_delay = None + _queue_dlq_length = None + _worker_utilization = None + _worker_in_flight = None + _worker_capacity = None def get_tracer() -> Tracer: @@ -202,6 +255,114 @@ def _ensure_red_instruments() -> None: ) +def _ensure_queue_instruments() -> None: + global _queue_stream_length, _queue_lag, _queue_pending, _queue_backlog + global _queue_consumer_delay, _queue_dlq_length + + if _meter is None: + setup_telemetry() + meter = _meter or metrics.get_meter(get_settings().otel_service_name) + + if _queue_stream_length is None: + _queue_stream_length = meter.create_gauge( + "agentflow.queue.stream_length", + description="Redis stream entry count (XLEN)", + unit="1", + ) + _queue_lag = meter.create_gauge( + "agentflow.queue.lag", + description="Undelivered entries not yet assigned to a consumer", + unit="1", + ) + _queue_pending = meter.create_gauge( + "agentflow.queue.pending", + description="Entries delivered but not yet XACKed", + unit="1", + ) + _queue_backlog = meter.create_gauge( + "agentflow.queue.backlog", + description="lag + pending (work waiting on workers)", + unit="1", + ) + _queue_consumer_delay = meter.create_gauge( + "agentflow.queue.consumer_delay", + description="Worst-case wait among lag and pending entries", + unit="s", + ) + _queue_dlq_length = meter.create_gauge( + "agentflow.queue.dlq_length", + description="Dead-letter stream length", + unit="1", + ) + + +def _ensure_worker_util_instruments() -> None: + global _worker_utilization, _worker_in_flight, _worker_capacity + + if _meter is None: + setup_telemetry() + meter = _meter or metrics.get_meter(get_settings().otel_service_name) + + if _worker_utilization is None: + _worker_utilization = meter.create_gauge( + "agentflow.worker.utilization", + description=( + "Fraction of local concurrency slots busy (in_flight / capacity)" + ), + unit="1", + ) + _worker_in_flight = meter.create_gauge( + "agentflow.worker.in_flight", + description="Run jobs currently executing in this worker process", + unit="1", + ) + _worker_capacity = meter.create_gauge( + "agentflow.worker.capacity", + description="Maximum parallel jobs (AGENTFLOW_WORKER_CONCURRENCY)", + unit="1", + ) + + +def record_queue_metrics(stats: "QueueStats") -> None: + """Export a queue monitor sample as OTel gauges (``agentflow.queue.*``).""" + if not is_enabled(): + return + _ensure_queue_instruments() + assert _queue_stream_length is not None + assert _queue_lag is not None + assert _queue_pending is not None + assert _queue_backlog is not None + assert _queue_consumer_delay is not None + assert _queue_dlq_length is not None + + _queue_stream_length.set(stats.stream_length) + _queue_lag.set(stats.lag_count) + _queue_pending.set(stats.pending_count) + _queue_backlog.set(stats.backlog_count) + delay = stats.consumer_delay_seconds + if delay is not None: + _queue_consumer_delay.set(delay) + if stats.dlq_length is not None: + _queue_dlq_length.set(stats.dlq_length) + + +def set_worker_utilization(*, in_flight: int, capacity: int) -> None: + """Update per-process worker slot usage (``agentflow.worker.*`` gauges).""" + if not is_enabled(): + return + _ensure_worker_util_instruments() + assert _worker_utilization is not None + assert _worker_in_flight is not None + assert _worker_capacity is not None + + capped = max(0, in_flight) + slots = max(1, capacity) + ratio = min(capped, slots) / slots + _worker_in_flight.set(capped) + _worker_capacity.set(capacity) + _worker_utilization.set(ratio) + + def capture_trace_context() -> dict[str, str] | None: """Serialize the active span context for cross-process propagation.""" if not is_enabled(): diff --git a/backend/app/worker/monitor.py b/backend/app/worker/monitor.py index 21d8a22..5eea91a 100644 --- a/backend/app/worker/monitor.py +++ b/backend/app/worker/monitor.py @@ -3,6 +3,7 @@ The monitor runs as a background asyncio task alongside the job consume loop. It periodically samples the run-job stream and emits: +* ``agentflow.queue.*`` OTel gauges when ``AGENTFLOW_OTEL_ENABLED=true``. * ``queue.metrics`` at INFO — baseline depth / lag / pending counters. * ``queue.consumer_delay_alert`` at WARNING — oldest undelivered or un-ACKed entry exceeds ``Settings.job_queue_consumer_delay_alert_seconds``. @@ -20,6 +21,7 @@ from app.core.config import Settings, get_settings from app.core.logging import get_logger +from app.core.telemetry import record_queue_metrics from app.worker.queue import JobQueue, QueueStats, RedisStreamsJobQueue logger = get_logger("worker.monitor") @@ -51,6 +53,7 @@ async def run_queue_monitor( try: stats = await collect_queue_stats(queue) if stats is not None: + record_queue_metrics(stats) logger.info( "queue.metrics", stream_length=stats.stream_length, diff --git a/backend/app/worker/queue.py b/backend/app/worker/queue.py index ffca8cf..8b3503e 100644 --- a/backend/app/worker/queue.py +++ b/backend/app/worker/queue.py @@ -390,14 +390,15 @@ async def collect_stats(self) -> QueueStats: pending_count = int(group_info.get("pending") or 0) if group_info else 0 if pending_count == 0: - trailing = await self._redis.xrange( + undelivered = await self._redis.xrange( self._stream, min=f"({last_delivered_id}", max="+", - count=1, ) - if trailing: - oldest_lag_seconds = _entry_age_seconds(trailing[0][0]) + if undelivered: + if lag_count == 0: + lag_count = len(undelivered) + oldest_lag_seconds = _entry_age_seconds(undelivered[0][0]) elif lag_count > 0: trailing = await self._redis.xrange( self._stream, diff --git a/backend/app/worker/runner.py b/backend/app/worker/runner.py index 3c5ae49..ea6bfd8 100644 --- a/backend/app/worker/runner.py +++ b/backend/app/worker/runner.py @@ -21,7 +21,12 @@ from app.adapters import EchoAdapter, LangGraphAdapter # noqa: F401 - register from app.core.config import get_settings from app.core.logging import get_logger, setup_logging -from app.core.telemetry import setup_telemetry, shutdown_telemetry, trace_worker_job +from app.core.telemetry import ( + set_worker_utilization, + setup_telemetry, + shutdown_telemetry, + trace_worker_job, +) from app.db.base import Base from app.db.session import engine from app.events import get_event_bus @@ -93,6 +98,11 @@ async def _consume_loop( slots = asyncio.Semaphore(concurrency) in_flight: set[asyncio.Task[None]] = set() + def _report_utilization() -> None: + set_worker_utilization(in_flight=len(in_flight), capacity=concurrency) + + _report_utilization() + async def _run_job(lease: JobLease) -> None: try: await _process_lease(executor, queue, lease) @@ -122,7 +132,13 @@ async def _run_job(lease: JobLease) -> None: task = asyncio.create_task(_run_job(lease)) in_flight.add(task) - task.add_done_callback(in_flight.discard) + + def _on_job_done(done: asyncio.Task[None]) -> None: + in_flight.discard(done) + _report_utilization() + + task.add_done_callback(_on_job_done) + _report_utilization() finally: aclose = getattr(consumer, "aclose", None) if aclose is not None: diff --git a/backend/tests/test_queue_monitor.py b/backend/tests/test_queue_monitor.py index 7e6256e..b7c989c 100644 --- a/backend/tests/test_queue_monitor.py +++ b/backend/tests/test_queue_monitor.py @@ -8,6 +8,8 @@ import fakeredis.aioredis as fakeredis import pytest +from unittest.mock import patch + from app.core.config import Settings from app.worker.monitor import ( _emit_delay_alerts, @@ -130,6 +132,30 @@ def test_depth_alert_edge_triggering(): assert active is False +@pytest.mark.asyncio +async def test_run_queue_monitor_exports_otel_metrics(): + redis = fakeredis.FakeRedis(decode_responses=True) + queue = _make_queue(redis) + await queue.enqueue(RunJob.new(run_id="r1", agent_id="a1", adapter="echo")) + + stop = asyncio.Event() + settings = Settings( + job_queue_monitor_enabled=True, + job_queue_monitor_interval_seconds=5, + ) + with patch("app.worker.monitor.record_queue_metrics") as record: + monitor_task = asyncio.create_task( + run_queue_monitor(queue, stop, settings=settings) + ) + await asyncio.sleep(0.05) + stop.set() + await asyncio.wait_for(monitor_task, timeout=1.0) + + assert record.call_count >= 1 + stats = record.call_args[0][0] + assert stats.stream_length >= 1 + + @pytest.mark.asyncio async def test_run_queue_monitor_stops_when_event_is_set(): redis = fakeredis.FakeRedis(decode_responses=True) diff --git a/backend/tests/test_telemetry.py b/backend/tests/test_telemetry.py index 8ad95a9..136e1fe 100644 --- a/backend/tests/test_telemetry.py +++ b/backend/tests/test_telemetry.py @@ -3,17 +3,23 @@ from __future__ import annotations import json +from unittest.mock import patch import pytest +from opentelemetry import metrics +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from app.core.config import Settings + from app.core.telemetry import ( capture_trace_context, is_enabled, + record_queue_metrics, + set_worker_utilization, setup_telemetry, shutdown_telemetry, ) -from app.worker.queue import RunJob +from app.worker.queue import QueueStats, RunJob @pytest.fixture(autouse=True) @@ -56,3 +62,59 @@ def test_runjob_omits_empty_trace_context(): ) data = json.loads(job.to_json()) assert "trace_context" not in data + + +def test_queue_metrics_noop_when_disabled(): + record_queue_metrics( + QueueStats( + stream_length=5, + lag_count=3, + pending_count=2, + oldest_lag_seconds=1.0, + oldest_pending_idle_seconds=None, + dlq_length=0, + ) + ) + + +def test_worker_utilization_noop_when_disabled(): + set_worker_utilization(in_flight=2, capacity=4) + + +def _metric_values(reader: InMemoryMetricReader) -> dict[str, float]: + provider = metrics.get_meter_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() # type: ignore[union-attr] + data = reader.get_metrics_data() + if data is None: + return {} + values: dict[str, float] = {} + for resource_metrics in data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + for point in metric.data.data_points: + values[metric.name] = float(point.value) # type: ignore[attr-defined] + return values + + +def test_otel_queue_and_worker_gauges_export(): + reader = InMemoryMetricReader() + settings = Settings(otel_enabled=True, otel_service_name="test-metrics") + with patch("app.core.telemetry.get_settings", return_value=settings): + setup_telemetry(settings=settings, metric_readers=[reader]) + record_queue_metrics( + QueueStats( + stream_length=10, + lag_count=4, + pending_count=1, + oldest_lag_seconds=30.0, + oldest_pending_idle_seconds=5.0, + dlq_length=2, + ) + ) + set_worker_utilization(in_flight=3, capacity=4) + exported = _metric_values(reader) + assert exported["agentflow.queue.backlog"] == 5.0 + assert exported["agentflow.queue.consumer_delay"] == 30.0 + assert exported["agentflow.worker.in_flight"] == 3.0 + assert exported["agentflow.worker.utilization"] == 0.75 diff --git a/backend/tests/test_worker_queue.py b/backend/tests/test_worker_queue.py index 2d19e21..d0106d3 100644 --- a/backend/tests/test_worker_queue.py +++ b/backend/tests/test_worker_queue.py @@ -343,6 +343,48 @@ async def consume(self): yield lease +@pytest.mark.asyncio +async def test_consume_loop_reports_worker_utilization(): + from unittest.mock import patch + + from app.worker import runner as runner_module + + utilization_samples: list[tuple[int, int]] = [] + + def _capture(*, in_flight: int, capacity: int) -> None: + utilization_samples.append((in_flight, capacity)) + + hold = asyncio.Event() + + class _BlockingExecutor: + async def execute(self, run_id: str, adapter: str) -> None: + await hold.wait() + + queue = _FiniteQueue(_leases_for_runs(["r1", "r2"], "agent-1")) + stop = asyncio.Event() + + with patch.object(runner_module, "set_worker_utilization", side_effect=_capture): + loop_task = asyncio.create_task( + runner_module._consume_loop( + executor=_BlockingExecutor(), # type: ignore[arg-type] + queue=queue, # type: ignore[arg-type] + stop=stop, + concurrency=2, + ) + ) + + for _ in range(50): + await asyncio.sleep(0.01) + if any(n >= 2 for n, _ in utilization_samples): + break + + hold.set() + await asyncio.wait_for(loop_task, timeout=2.0) + + assert (0, 2) in utilization_samples + assert any(n == 2 and cap == 2 for n, cap in utilization_samples) + + @pytest.mark.asyncio async def test_consume_loop_respects_worker_concurrency(): from app.worker import runner as runner_module diff --git a/docker/grafana/dashboards/agentflow-observability.json b/docker/grafana/dashboards/agentflow-observability.json new file mode 100644 index 0000000..4bdb32a --- /dev/null +++ b/docker/grafana/dashboards/agentflow-observability.json @@ -0,0 +1,87 @@ +{ + "annotations": { "list": [] }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { "type": "prometheus", "uid": "${datasource}" }, + "fieldConfig": { "defaults": { "unit": "percentunit", "min": 0, "max": 1 } }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "id": 1, + "options": { "legend": { "displayMode": "list", "placement": "bottom" } }, + "targets": [ + { + "expr": "agentflow_worker_utilization", + "legendFormat": "{{service_name}}" + } + ], + "title": "Worker utilization", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "${datasource}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "id": 2, + "options": { "legend": { "displayMode": "list", "placement": "bottom" } }, + "targets": [ + { "expr": "agentflow_queue_backlog", "legendFormat": "backlog" }, + { "expr": "agentflow_queue_lag", "legendFormat": "lag" }, + { "expr": "agentflow_queue_pending", "legendFormat": "pending" } + ], + "title": "Queue depth", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "${datasource}" }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "id": 3, + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(agentflow_worker_job_duration_bucket[5m])) by (le))", + "legendFormat": "worker job p95" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(agentflow_http_server_duration_bucket[5m])) by (le))", + "legendFormat": "http p95" + } + ], + "title": "p95 duration (worker + HTTP)", + "type": "timeseries" + }, + { + "datasource": { "type": "prometheus", "uid": "${datasource}" }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "id": 4, + "targets": [ + { + "expr": "agentflow_queue_consumer_delay", + "legendFormat": "consumer delay" + } + ], + "title": "Queue consumer delay", + "type": "timeseries" + } + ], + "schemaVersion": 39, + "tags": ["agentflow"], + "templating": { + "list": [ + { + "name": "datasource", + "type": "datasource", + "query": "prometheus", + "current": {} + } + ] + }, + "time": { "from": "now-1h", "to": "now" }, + "title": "AgentFlow observability", + "uid": "agentflow-observability", + "version": 1 +} diff --git a/docs/deployment.md b/docs/deployment.md index 00d403e..33abc48 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -60,7 +60,10 @@ needed when running adapters that call external models. | `AGENTFLOW_OTEL_EXPORTER_ENDPOINT` | Worker | OTLP HTTP base URL without path (default `http://localhost:4318`) | RED metric names (both stacks): `agentflow.http.server.*`, `agentflow.worker.job.*`, -`agentflow.adapter.run.*`. Run jobs carry W3C `trace_context` in the Redis payload so +`agentflow.adapter.run.*`. The Python worker also exports queue gauges +(`agentflow.queue.*`) from the queue monitor and per-process slot usage +(`agentflow.worker.utilization`, `agentflow.worker.in_flight`, +`agentflow.worker.capacity`). Run jobs carry W3C `trace_context` in the Redis payload so worker spans link to the API trace. Local collector + Prometheus scrape: @@ -71,6 +74,10 @@ docker compose --profile observability --profile app up --build export AGENTFLOW_OTEL_ENABLED=true ``` +Import `docker/grafana/dashboards/agentflow-observability.json` into Grafana (or add a +Grafana service to compose) for worker utilization, queue depth, consumer delay, and +p95 latency panels scraped from `:8889/metrics`. + ### Frontend | Variable | Production value | diff --git a/docs/plan.md b/docs/plan.md index 110cc78..74f096b 100644 --- a/docs/plan.md +++ b/docs/plan.md @@ -9,7 +9,7 @@ 按投入产出比排序: 1. **事件总线持久化。** Redis pub/sub 无订阅者时会丢消息;支持 `Last-Event-ID` 重放。 -2. **队列 OTel 指标与背压。** 深度/消费者延迟已有日志告警,需导出为 `agentflow.queue.*` 指标;补充 worker 利用率、p95 看板。 +2. **队列 OTel 指标与背压。** 深度/消费者延迟已导出为 `agentflow.queue.*`;worker 利用率见 `agentflow.worker.utilization`;p95 面板见 `docker/grafana/dashboards/agentflow-observability.json`。 3. **控制台调试体验。** Step 可视化时间线、独立 ToolCall 检查面板。 4. **LangGraph adapter 扩展。** 更多 graph 模式、MCP 工具协议集成。 5. **双向流式传输。** WebSocket / WebTransport + SSE 降级,支持双向取消与审批。 @@ -35,7 +35,7 @@ - [ ] Step 时间线组件(节点延迟与状态流转) - [ ] 独立 ToolCall 检查面板(参数/结果/错误结构化浏览) - [ ] SSE 事件重放(`Last-Event-ID` / 持久化 event log) -- [ ] 队列深度、worker 利用率导出为 OTel/Prometheus 指标 +- [x] 队列深度、worker 利用率导出为 OTel/Prometheus 指标 - [ ] p95 耗时仪表盘与告警 **验收:** 控制台展示可视化时间线;断连 SSE 可补全事件;队列指标可在 Grafana 查看。