Summary
The gateway has no flow control in either direction. Inbound, each session's message queue is an unbounded asyncio.Queue, so a client that floods messages (or an agent that processes slowly) grows the queue without limit. Outbound, the server writes to client sockets with no check on how much data is already buffered, so a single slow or stalled client accumulates unsent frames in memory. Both paths let one misbehaving client degrade or exhaust the whole gateway process. A world-class, robust gateway must bound queues and shed or disconnect slow consumers deterministically.
Current behaviour
The per-session inbox is created with no maxsize and producers await put() without any bound:
# src/praisonai/praisonai/gateway/server.py:56
_inbox: asyncio.Queue = field(default_factory=asyncio.Queue) # unbounded
# src/praisonai/praisonai/gateway/server.py:178-180
async def queue_message(self, message: str) -> None:
"""Queue a user message for execution after the current operation."""
await self._inbox.put(message) # never blocks/rejects
Inbound client messages flow straight into this queue (src/praisonai/praisonai/gateway/server.py:1057,1061, await session.queue_message(content)) with no per-client send window or quota. On the outbound side, _send_to_client writes to the socket without inspecting buffered bytes, so there is no slow-consumer detection and no overflow close — a stalled client's frames pile up in process memory until the connection eventually fails. There is no equivalent of a max-buffered-bytes threshold or a "slow consumer" disconnect anywhere in the gateway server.
Desired behaviour
- Inbound: bound the per-session inbox (configurable
max_inbox); when full, reject with a typed error / NACK or apply a per-client send window, rather than growing without limit.
- Outbound: before writing, check the connection's buffered byte count against a configurable
max_buffered_bytes. For droppable events (e.g. presence/typing) skip the send; for ordered/critical events close the connection with a clear "slow consumer" reason so the client reconnects and resumes cleanly.
- These limits should be advertised to clients (ties into the handshake
policy block — see the protocol-handshake issue) so well-behaved clients self-throttle.
Layer placement
- Primary layer: wrapper (
praisonai) — the WebSocket server in src/praisonai/praisonai/gateway/server.py owns the sockets and the session inbox, which is where buffered-bytes inspection and queue bounding must happen.
- Why not core: core defines the protocol/contract but does not own the live socket or the asyncio queue; only the limit values (policy) belong in the core contract, not the enforcement.
- Why not tools: unrelated to agent-callable integrations.
- Why not plugins: flow control is transport robustness inside the gateway server, not a lifecycle guardrail layered around an agent.
- Secondary touch: core (
praisonaiagents/gateway/config.py / protocols.py) to carry max_inbox and max_buffered_bytes as a shared policy advertised in the handshake.
- 3-way surface (CLI + YAML + Python): no — these are gateway runtime safety limits exposed via gateway config, not a per-feature CLI/YAML surface.
Proposed approach
- Extension point: config + gateway server enforcement.
- Add
max_inbox to SessionConfig and max_buffered_bytes to GatewayConfig (src/praisonai-agents/praisonaiagents/gateway/config.py).
- Bound the inbox and guard outbound writes:
# Inbound: bounded queue + typed rejection
self._inbox = asyncio.Queue(maxsize=config.session.max_inbox) # e.g. 256
try:
self._inbox.put_nowait(message)
except asyncio.QueueFull:
await self._send_to_client(client_id, {"type": "error", "code": "inbox_full"})
# Outbound: slow-consumer guard before writing
buffered = getattr(ws, "transport", None) and ws.transport.get_write_buffer_size()
if buffered and buffered > config.max_buffered_bytes:
if event_is_droppable: # presence/typing
return
await ws.close(code=1013, reason="slow consumer") # client reconnects + resumes
Resolution sketch
# Before (today)
_inbox = asyncio.Queue() # unbounded -> memory grows under flood
await self._inbox.put(message) # producer never throttled
await self._send_to_client(cid, frame) # no check on a stalled socket
# After (proposed)
_inbox = asyncio.Queue(maxsize=cfg.session.max_inbox) # bounded
self._inbox.put_nowait(message) # QueueFull -> typed NACK, no unbounded growth
if buffered_bytes(ws) > cfg.max_buffered_bytes: # slow-consumer guard
drop_or_close(ws) # shed droppable events, close on critical overflow
Severity
High — a single flooding or stalled client can grow unbounded queues or buffered frames and exhaust the gateway process, taking down service for every other connected client; bounded queues and slow-consumer shedding are table stakes for a robust multi-client gateway.
Validation
Confirmed by reading src/praisonai/praisonai/gateway/server.py: the session inbox is an unbounded asyncio.Queue (:56), queue_message does an unguarded await put (:178-180), inbound client content is enqueued without a window (:1057,1061), and _send_to_client writes without any buffered-bytes / slow-consumer check. A search for maxsize, buffered, backpressure, and QueueFull in the gateway server found no bounding or flow-control logic.
Summary
The gateway has no flow control in either direction. Inbound, each session's message queue is an unbounded
asyncio.Queue, so a client that floods messages (or an agent that processes slowly) grows the queue without limit. Outbound, the server writes to client sockets with no check on how much data is already buffered, so a single slow or stalled client accumulates unsent frames in memory. Both paths let one misbehaving client degrade or exhaust the whole gateway process. A world-class, robust gateway must bound queues and shed or disconnect slow consumers deterministically.Current behaviour
The per-session inbox is created with no
maxsizeand producersawait put()without any bound:Inbound client messages flow straight into this queue (
src/praisonai/praisonai/gateway/server.py:1057,1061,await session.queue_message(content)) with no per-client send window or quota. On the outbound side,_send_to_clientwrites to the socket without inspecting buffered bytes, so there is no slow-consumer detection and no overflow close — a stalled client's frames pile up in process memory until the connection eventually fails. There is no equivalent of a max-buffered-bytes threshold or a "slow consumer" disconnect anywhere in the gateway server.Desired behaviour
max_inbox); when full, reject with a typed error / NACK or apply a per-client send window, rather than growing without limit.max_buffered_bytes. For droppable events (e.g. presence/typing) skip the send; for ordered/critical events close the connection with a clear "slow consumer" reason so the client reconnects and resumes cleanly.policyblock — see the protocol-handshake issue) so well-behaved clients self-throttle.Layer placement
praisonai) — the WebSocket server insrc/praisonai/praisonai/gateway/server.pyowns the sockets and the session inbox, which is where buffered-bytes inspection and queue bounding must happen.praisonaiagents/gateway/config.py/protocols.py) to carrymax_inboxandmax_buffered_bytesas a shared policy advertised in the handshake.Proposed approach
max_inboxtoSessionConfigandmax_buffered_bytestoGatewayConfig(src/praisonai-agents/praisonaiagents/gateway/config.py).Resolution sketch
Severity
High — a single flooding or stalled client can grow unbounded queues or buffered frames and exhaust the gateway process, taking down service for every other connected client; bounded queues and slow-consumer shedding are table stakes for a robust multi-client gateway.
Validation
Confirmed by reading
src/praisonai/praisonai/gateway/server.py: the session inbox is an unboundedasyncio.Queue(:56),queue_messagedoes an unguardedawait put(:178-180), inbound client content is enqueued without a window (:1057,1061), and_send_to_clientwrites without any buffered-bytes / slow-consumer check. A search formaxsize,buffered,backpressure, andQueueFullin the gateway server found no bounding or flow-control logic.