feat: enhance worker monitoring and utilization reporting #25
Merged
Conversation
smxzk32145745
commented
Jun 4, 2026
Contributor
- Added OpenTelemetry gauge reporting for queue metrics when AGENTFLOW_OTEL_ENABLED is set to true.
- Updated the queue monitoring logic to record metrics using the new telemetry function.
- Refactored the consume loop to report worker utilization, capturing in-flight tasks and capacity.
- Introduced a test to validate the worker utilization reporting functionality.
- Added OpenTelemetry gauge reporting for queue metrics when AGENTFLOW_OTEL_ENABLED is set to true. - Updated the queue monitoring logic to record metrics using the new telemetry function. - Refactored the consume loop to report worker utilization, capturing in-flight tasks and capacity. - Introduced a test to validate the worker utilization reporting functionality.
- Introduced new metrics for queue depth, consumer delay, and worker utilization in the telemetry module. - Updated the setup_telemetry function to accept custom metric readers. - Implemented tests to validate the correct export of queue and worker metrics. - Added a Grafana dashboard JSON for visualizing the new metrics. - Enhanced deployment documentation to include new metrics and dashboard integration.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR enhances AgentFlow’s observability by exporting additional queue/worker utilization metrics via OpenTelemetry (and providing a Grafana dashboard), while refactoring the worker consume loop to report per-process utilization.
Changes:
- Add queue and worker utilization metric reporting hooks (queue monitor + worker consume loop).
- Introduce/extend tests validating utilization reporting and OTel metric export behavior.
- Document the new metrics and add a Grafana dashboard JSON for the panels.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/plan.md | Updates the roadmap/checklist to reflect completed OTel queue + worker utilization metrics and references the new dashboard. |
| docs/deployment.md | Documents new queue + worker utilization metric names and Grafana import instructions. |
| docker/grafana/dashboards/agentflow-observability.json | Adds a Grafana dashboard for utilization, queue depth, consumer delay, and p95 latency panels. |
| backend/tests/test_worker_queue.py | Adds a test asserting utilization reporting from the consume loop. |
| backend/tests/test_telemetry.py | Adds tests for queue/worker gauge export and noop behavior when OTel is disabled. |
| backend/tests/test_queue_monitor.py | Adds a test ensuring the queue monitor calls the telemetry recording hook. |
| backend/app/worker/runner.py | Refactors consume loop to report in-flight/capacity utilization updates. |
| backend/app/worker/queue.py | Adjusts Redis Streams stats collection logic around undelivered entries. |
| backend/app/worker/monitor.py | Calls telemetry queue-metric recording from the queue monitor loop. |
| backend/app/core/telemetry.py | Adds queue/worker metric instruments and reporting APIs; extends telemetry setup/shutdown for testability. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
116
to
123
| tracer_provider = TracerProvider(resource=resource) | ||
| tracer_provider.add_span_processor( | ||
| BatchSpanProcessor( | ||
| OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"), | ||
| if metric_readers is None: | ||
| tracer_provider.add_span_processor( | ||
| BatchSpanProcessor( | ||
| OTLPSpanExporter(endpoint=f"{endpoint}/v1/traces"), | ||
| ) | ||
| ) | ||
| ) | ||
| trace.set_tracer_provider(tracer_provider) |
Comment on lines
+267
to
+271
| _queue_stream_length = meter.create_gauge( | ||
| "agentflow.queue.stream_length", | ||
| description="Redis stream entry count (XLEN)", | ||
| unit="1", | ||
| ) |
Comment on lines
392
to
+401
| if pending_count == 0: | ||
| trailing = await self._redis.xrange( | ||
| undelivered = await self._redis.xrange( | ||
| self._stream, | ||
| min=f"({last_delivered_id}", | ||
| max="+", | ||
| count=1, | ||
| ) | ||
| if trailing: | ||
| oldest_lag_seconds = _entry_age_seconds(trailing[0][0]) | ||
| if undelivered: | ||
| if lag_count == 0: | ||
| lag_count = len(undelivered) | ||
| oldest_lag_seconds = _entry_age_seconds(undelivered[0][0]) |
Comment on lines
+307
to
+313
| _worker_utilization = meter.create_gauge( | ||
| "agentflow.worker.utilization", | ||
| description=( | ||
| "Fraction of local concurrency slots busy (in_flight / capacity)" | ||
| ), | ||
| unit="1", | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.