From 10d8a333ccaab0e06a4dd017d77377b88cb1a710 Mon Sep 17 00:00:00 2001 From: smxzk32145745 Date: Thu, 4 Jun 2026 15:53:28 +0800 Subject: [PATCH] feat: enhance worker monitoring and utilization reporting - Added OpenTelemetry gauge reporting for queue metrics when AGENTFLOW_OTEL_ENABLED is set to true. - Updated the queue monitoring logic to record metrics using the new telemetry function. - Refactored the consume loop to report worker utilization, capturing in-flight tasks and capacity. - Introduced a test to validate the worker utilization reporting functionality. --- backend/app/worker/monitor.py | 3 +++ backend/app/worker/queue.py | 9 ++++--- backend/app/worker/runner.py | 20 ++++++++++++-- backend/tests/test_worker_queue.py | 42 ++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/backend/app/worker/monitor.py b/backend/app/worker/monitor.py index 21d8a22..5eea91a 100644 --- a/backend/app/worker/monitor.py +++ b/backend/app/worker/monitor.py @@ -3,6 +3,7 @@ The monitor runs as a background asyncio task alongside the job consume loop. It periodically samples the run-job stream and emits: +* ``agentflow.queue.*`` OTel gauges when ``AGENTFLOW_OTEL_ENABLED=true``. * ``queue.metrics`` at INFO — baseline depth / lag / pending counters. * ``queue.consumer_delay_alert`` at WARNING — oldest undelivered or un-ACKed entry exceeds ``Settings.job_queue_consumer_delay_alert_seconds``. @@ -20,6 +21,7 @@ from app.core.config import Settings, get_settings from app.core.logging import get_logger +from app.core.telemetry import record_queue_metrics from app.worker.queue import JobQueue, QueueStats, RedisStreamsJobQueue logger = get_logger("worker.monitor") @@ -51,6 +53,7 @@ async def run_queue_monitor( try: stats = await collect_queue_stats(queue) if stats is not None: + record_queue_metrics(stats) logger.info( "queue.metrics", stream_length=stats.stream_length, diff --git a/backend/app/worker/queue.py b/backend/app/worker/queue.py index ffca8cf..8b3503e 100644 --- a/backend/app/worker/queue.py +++ b/backend/app/worker/queue.py @@ -390,14 +390,15 @@ async def collect_stats(self) -> QueueStats: pending_count = int(group_info.get("pending") or 0) if group_info else 0 if pending_count == 0: - trailing = await self._redis.xrange( + undelivered = await self._redis.xrange( self._stream, min=f"({last_delivered_id}", max="+", - count=1, ) - if trailing: - oldest_lag_seconds = _entry_age_seconds(trailing[0][0]) + if undelivered: + if lag_count == 0: + lag_count = len(undelivered) + oldest_lag_seconds = _entry_age_seconds(undelivered[0][0]) elif lag_count > 0: trailing = await self._redis.xrange( self._stream, diff --git a/backend/app/worker/runner.py b/backend/app/worker/runner.py index 3c5ae49..ea6bfd8 100644 --- a/backend/app/worker/runner.py +++ b/backend/app/worker/runner.py @@ -21,7 +21,12 @@ from app.adapters import EchoAdapter, LangGraphAdapter # noqa: F401 - register from app.core.config import get_settings from app.core.logging import get_logger, setup_logging -from app.core.telemetry import setup_telemetry, shutdown_telemetry, trace_worker_job +from app.core.telemetry import ( + set_worker_utilization, + setup_telemetry, + shutdown_telemetry, + trace_worker_job, +) from app.db.base import Base from app.db.session import engine from app.events import get_event_bus @@ -93,6 +98,11 @@ async def _consume_loop( slots = asyncio.Semaphore(concurrency) in_flight: set[asyncio.Task[None]] = set() + def _report_utilization() -> None: + set_worker_utilization(in_flight=len(in_flight), capacity=concurrency) + + _report_utilization() + async def _run_job(lease: JobLease) -> None: try: await _process_lease(executor, queue, lease) @@ -122,7 +132,13 @@ async def _run_job(lease: JobLease) -> None: task = asyncio.create_task(_run_job(lease)) in_flight.add(task) - task.add_done_callback(in_flight.discard) + + def _on_job_done(done: asyncio.Task[None]) -> None: + in_flight.discard(done) + _report_utilization() + + task.add_done_callback(_on_job_done) + _report_utilization() finally: aclose = getattr(consumer, "aclose", None) if aclose is not None: diff --git a/backend/tests/test_worker_queue.py b/backend/tests/test_worker_queue.py index 2d19e21..d0106d3 100644 --- a/backend/tests/test_worker_queue.py +++ b/backend/tests/test_worker_queue.py @@ -343,6 +343,48 @@ async def consume(self): yield lease +@pytest.mark.asyncio +async def test_consume_loop_reports_worker_utilization(): + from unittest.mock import patch + + from app.worker import runner as runner_module + + utilization_samples: list[tuple[int, int]] = [] + + def _capture(*, in_flight: int, capacity: int) -> None: + utilization_samples.append((in_flight, capacity)) + + hold = asyncio.Event() + + class _BlockingExecutor: + async def execute(self, run_id: str, adapter: str) -> None: + await hold.wait() + + queue = _FiniteQueue(_leases_for_runs(["r1", "r2"], "agent-1")) + stop = asyncio.Event() + + with patch.object(runner_module, "set_worker_utilization", side_effect=_capture): + loop_task = asyncio.create_task( + runner_module._consume_loop( + executor=_BlockingExecutor(), # type: ignore[arg-type] + queue=queue, # type: ignore[arg-type] + stop=stop, + concurrency=2, + ) + ) + + for _ in range(50): + await asyncio.sleep(0.01) + if any(n >= 2 for n, _ in utilization_samples): + break + + hold.set() + await asyncio.wait_for(loop_task, timeout=2.0) + + assert (0, 2) in utilization_samples + assert any(n == 2 and cap == 2 for n, cap in utilization_samples) + + @pytest.mark.asyncio async def test_consume_loop_respects_worker_concurrency(): from app.worker import runner as runner_module