feat: enhance worker monitoring and utilization reporting#24
Merged
Conversation
smxzk32145745
commented
Jun 4, 2026
Contributor
- Added OpenTelemetry gauge reporting for queue metrics when AGENTFLOW_OTEL_ENABLED is set to true.
- Updated the queue monitoring logic to record metrics using the new telemetry function.
- Refactored the consume loop to report worker utilization, capturing in-flight tasks and capacity.
- Introduced a test to validate the worker utilization reporting functionality.
- Added OpenTelemetry gauge reporting for queue metrics when AGENTFLOW_OTEL_ENABLED is set to true. - Updated the queue monitoring logic to record metrics using the new telemetry function. - Refactored the consume loop to report worker utilization, capturing in-flight tasks and capacity. - Introduced a test to validate the worker utilization reporting functionality.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR enhances worker/queue observability by adding utilization reporting to the worker consume loop and by emitting queue metrics via (intended) OpenTelemetry helpers, with a new test covering utilization reporting.
Changes:
- Added worker utilization sampling in the consume loop (in-flight tasks vs configured concurrency).
- Updated the queue monitor to emit queue metrics via a telemetry hook before logging.
- Adjusted Redis stream stats collection for undelivered entries and added a utilization test.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
backend/app/worker/runner.py |
Reports worker utilization on startup and whenever jobs start/finish. |
backend/app/worker/monitor.py |
Calls a telemetry hook to record queue metrics when stats are collected. |
backend/app/worker/queue.py |
Changes how undelivered entries are queried when pending is zero. |
backend/tests/test_worker_queue.py |
Adds an async test asserting utilization samples are reported. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+24
to
+29
| from app.core.telemetry import ( | ||
| set_worker_utilization, | ||
| setup_telemetry, | ||
| shutdown_telemetry, | ||
| trace_worker_job, | ||
| ) |
Comment on lines
22
to
26
| from app.core.config import Settings, get_settings | ||
| from app.core.logging import get_logger | ||
| from app.core.telemetry import record_queue_metrics | ||
| from app.worker.queue import JobQueue, QueueStats, RedisStreamsJobQueue | ||
|
|
Comment on lines
392
to
396
| if pending_count == 0: | ||
| trailing = await self._redis.xrange( | ||
| undelivered = await self._redis.xrange( | ||
| self._stream, | ||
| min=f"({last_delivered_id}", | ||
| max="+", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.