fix: add backpressure and slow-consumer flow control to gateway#2156
fix: add backpressure and slow-consumer flow control to gateway#2156praisonai-triage-agent[bot] wants to merge 2 commits into
Conversation
…#2152) - Add max_inbox config to SessionConfig for bounded message queues (default 256) - Add max_buffered_bytes config to GatewayConfig for slow consumer detection (default 1MB) - Create prototype implementation showing bounded queue and flow control patterns - Prevents unbounded memory growth from flooding clients - Detects and disconnects slow consumers to protect gateway stability Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds ChangesBackpressure and Slow-Consumer Flow Control
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
| class GatewaySession: | ||
| """Gateway session with bounded inbox for backpressure.""" | ||
|
|
||
| _session_id: str | ||
| _agent_id: str | ||
| _client_id: Optional[str] = None | ||
| _inbox: asyncio.Queue = field(default_factory=asyncio.Queue) | ||
| _max_inbox: int = 256 | ||
| _is_executing: bool = False | ||
|
|
||
| def setup_inbox(self, max_inbox: int = 256) -> None: | ||
| """Set up the inbox with proper bounds.""" | ||
| self._max_inbox = max_inbox | ||
| if max_inbox > 0: | ||
| self._inbox = asyncio.Queue(maxsize=max_inbox) | ||
| else: | ||
| self._inbox = asyncio.Queue() # Unbounded for backward compat if 0 |
There was a problem hiding this comment.
Two-step init leaves
GatewaySession briefly in an inconsistent state
GatewaySession constructs with an unbounded asyncio.Queue() default and relies on a separate setup_inbox() call to install the bounded queue. Any code that constructs GatewaySession directly (rather than through GatewayServer.create_session) will silently get an unbounded inbox regardless of config. Moving the bounded queue creation into __post_init__ or accepting max_inbox as a constructor argument would prevent this footgun in the real implementation.
| """ | ||
| Prototype implementation of Gateway Server with backpressure and flow control. | ||
|
|
||
| This demonstrates how the bounded queues and slow consumer detection would be | ||
| implemented in the wrapper layer (praisonai package) using the configuration | ||
| from the core SDK (praisonaiagents). | ||
| """ |
There was a problem hiding this comment.
Prototype file lives at the repository root
Landing a prototype at the root of the repo risks it being treated as a production artifact, imported accidentally, or left behind indefinitely. Consider placing it under examples/ or docs/ alongside any other reference implementations, or making it part of the test suite.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/praisonai-agents/praisonaiagents/gateway/config.py (1)
30-41:⚠️ Potential issue | 🟠 Major | ⚡ Quick winValidate non-negative limit values at construction time.
Line 30 and Line 232 accept negative numbers, which silently disables protections in consumers that gate on
> 0(e.g., bounded inbox and buffered-bytes checks). Please reject invalid config early to avoid reintroducing unbounded growth behavior via misconfiguration.🔧 Proposed fix
class SessionConfig: @@ max_inbox: int = 256 # Default bounded queue size metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.max_inbox < 0: + raise ValueError( + "max_inbox must be >= 0 (use 0 for unlimited queue size)" + ) class GatewayConfig: @@ def __post_init__(self) -> None: """Post-initialization to set bind_host from host if not specified.""" + if self.max_buffered_bytes < 0: + raise ValueError( + "max_buffered_bytes must be >= 0 (use 0 to disable slow-consumer checks)" + ) if self.bind_host is None: self.bind_host = self.hostAlso applies to: 232-254
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/praisonai-agents/praisonaiagents/gateway/config.py` around lines 30 - 41, The config dataclass accepts negative values for numeric limit fields which silently disables protective checks in consumers that validate with greater-than-zero comparisons. Add a __post_init__ method to the config dataclass that validates fields like timeout, max_messages, max_inbox, and resume_window are non-negative, raising a ValueError with a clear message if any limit field contains a negative value, ensuring invalid configurations are rejected at construction time before they can cause unbounded growth behavior.
🧹 Nitpick comments (1)
gateway_server_prototype.py (1)
153-154: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick winAvoid blind
except Exceptionin the send path.At Line 153, catching all exceptions can hide actionable transport/protocol failures and makes recovery behavior ambiguous. Prefer handling known send/close exceptions explicitly and log with
exc_info=Truefor traceability.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@gateway_server_prototype.py` around lines 153 - 154, Replace the overly broad except Exception block in the send path with explicit exception handlers for known send and close operation failures. Instead of catching all exceptions generically, handle specific exceptions that can occur during client communication (such as transport or protocol-related errors). Additionally, modify the logger.error call to include exc_info=True parameter to capture the full stack trace and exception details, which improves debugging traceability compared to just including the exception message in the string.Source: Linters/SAST tools
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@gateway_server_prototype.py`:
- Around line 107-123: The session._is_executing flag is set to True at the
beginning of the message handling code but is never cleared or an actual
processing task is never started, causing the flag to remain stuck. Create a new
async method (similar to the proposed _drain_session_queue pattern) that
processes all messages from the session's inbox queue and ensures the
_is_executing flag is cleared in a finally block. After successfully queueing
the message, invoke this draining method instead of simply returning "Processing
started." This ensures the queue is actually drained and the executing flag is
properly reset after processing completes.
---
Outside diff comments:
In `@src/praisonai-agents/praisonaiagents/gateway/config.py`:
- Around line 30-41: The config dataclass accepts negative values for numeric
limit fields which silently disables protective checks in consumers that
validate with greater-than-zero comparisons. Add a __post_init__ method to the
config dataclass that validates fields like timeout, max_messages, max_inbox,
and resume_window are non-negative, raising a ValueError with a clear message if
any limit field contains a negative value, ensuring invalid configurations are
rejected at construction time before they can cause unbounded growth behavior.
---
Nitpick comments:
In `@gateway_server_prototype.py`:
- Around line 153-154: Replace the overly broad except Exception block in the
send path with explicit exception handlers for known send and close operation
failures. Instead of catching all exceptions generically, handle specific
exceptions that can occur during client communication (such as transport or
protocol-related errors). Additionally, modify the logger.error call to include
exc_info=True parameter to capture the full stack trace and exception details,
which improves debugging traceability compared to just including the exception
message in the string.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e50f4a07-deb3-4e7b-bcca-73e7585dcf71
📒 Files selected for processing (2)
gateway_server_prototype.pysrc/praisonai-agents/praisonaiagents/gateway/config.py
| # Queue and process | ||
| session._is_executing = True | ||
| queued = await session.queue_message(message) | ||
| if not queued: | ||
| await self._send_to_client( | ||
| client_id, | ||
| { | ||
| "type": "error", | ||
| "code": "inbox_full", | ||
| "message": "Message queue is full." | ||
| } | ||
| ) | ||
| session._is_executing = False | ||
| return "Inbox full - message rejected." | ||
|
|
||
| # Start processing... | ||
| return "Processing started." |
There was a problem hiding this comment.
_is_executing gets stuck True because no processing path is started.
At Line 108, execution is marked active, but the success path returns at Line 123 without draining the queue or clearing the flag. After the first message, later calls always take the queueing branch and can hit false inbox_full rejection.
🔧 Proposed direction
- # Queue and process
- session._is_executing = True
- queued = await session.queue_message(message)
+ # Queue and process
+ queued = await session.queue_message(message)
if not queued:
@@
- session._is_executing = False
return "Inbox full - message rejected."
-
- # Start processing...
- return "Processing started."
+
+ if not session._is_executing:
+ session._is_executing = True
+ asyncio.create_task(self._drain_session_queue(session, client_id))
+ return "Message queued."# add a drain worker method
async def _drain_session_queue(self, session: GatewaySession, client_id: str) -> None:
try:
while not session._inbox.empty():
msg = await session._inbox.get()
# process msg here
finally:
session._is_executing = False🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@gateway_server_prototype.py` around lines 107 - 123, The
session._is_executing flag is set to True at the beginning of the message
handling code but is never cleared or an actual processing task is never
started, causing the flag to remain stuck. Create a new async method (similar to
the proposed _drain_session_queue pattern) that processes all messages from the
session's inbox queue and ensures the _is_executing flag is cleared in a finally
block. After successfully queueing the message, invoke this draining method
instead of simply returning "Processing started." This ensures the queue is
actually drained and the executing flag is properly reset after processing
completes.
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job Architecture Review & Fixes ✅Todo List
Phase 1: Architecture Review✅ Protocol-driven design: Configuration properly placed in core SDK (praisonaiagents), implementation pattern shown in prototype for wrapper layer. ✅ Backward compatibility: Default values maintain existing behavior (max_inbox=256, max_buffered_bytes=1MB). ✅ Performance: No hot-path regression in config, prototype logic bug fixed. Phase 2: Fixes ImplementedAll critical issues identified by reviewers have been fixed in commit aafc19e:
Phase 3: Final Verdict✅ APPROVED - This PR successfully implements backpressure and slow-consumer flow control for the gateway:
The prototype file remains at the repo root as a demonstration for this PR and can be relocated or removed in a follow-up if needed. |
- Add drain loop to prototype to fix _is_executing getting stuck - Update MultiChannelGatewayConfig.from_dict to forward new config fields - Add validation for negative values in SessionConfig and GatewayConfig - Improve error logging with exc_info=True Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
| session._is_executing = True | ||
| asyncio.create_task(self._drain_session_queue(session, client_id)) | ||
| return "Processing started." | ||
|
|
There was a problem hiding this comment.
Drain task can be silently garbage-collected mid-execution
asyncio.create_task is called but its return value is discarded. Per the Python docs, the event loop holds only a weak reference to tasks, so the GC can collect the drain task before it finishes if no other strong reference exists. Concretely: if the heap is under memory pressure while _drain_session_queue is awaiting asyncio.sleep or _send_to_client, the task disappears, _is_executing is never reset (the finally block doesn't run), and that session's queue is permanently stalled. Store the task on the session or in a set so a strong reference is kept until completion.
Fixes #2152
Summary
Changes
Core SDK (praisonaiagents/gateway/config.py)
Prototype Implementation
Test Results
The prototype successfully demonstrates:
Note: The actual gateway server implementation in the wrapper layer (praisonai package) would follow this same pattern.
Summary by CodeRabbit