fix: gateway resume can silently lose events - add integrity checks#2155
fix: gateway resume can silently lose events - add integrity checks#2155praisonai-triage-agent[bot] wants to merge 2 commits into
Conversation
β¦ixes #2153) - Track oldest retained cursor in session buffer - Add resync_required flag when requested cursor is below oldest retained - Send authoritative snapshot instead of partial replay when gap detected - Add top-level seq field to all events for client-side gap detection - Update wire protocol documentation with new resume fields Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more β On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
β Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the βοΈ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
π WalkthroughWalkthroughExtends the gateway resume protocol with cursor-based integrity checks. ChangesGateway Resume Integrity
Sequence Diagram(s)sequenceDiagram
participant Client
participant GatewayServer
participant GatewaySession
Client->>GatewayServer: join {agent_id, since: 5}
GatewayServer->>GatewaySession: check_resync_required(5)
GatewaySession-->>GatewayServer: resync_required=true, oldest_cursor=3201
GatewayServer->>Client: joined {cursor: 4200, oldest_cursor: 3201, resync_required: true}
GatewayServer->>GatewaySession: get_snapshot()
GatewaySession-->>GatewayServer: {state, messages, event_cursor: 4200}
GatewayServer->>Client: snapshot {state: {...}, event_cursor: 4200}
Note over Client,GatewayServer: Happy-path replay (resync_required=false)
Client->>GatewayServer: join {agent_id, since: 4100}
GatewayServer->>GatewaySession: check_resync_required(4100)
GatewaySession-->>GatewayServer: resync_required=false, oldest_cursor=3201
GatewayServer->>Client: joined {cursor: 4200, oldest_cursor: 3201, resync_required: false}
GatewayServer->>GatewaySession: get_events_since(4100)
GatewaySession-->>GatewayServer: events [4101..4200]
loop each missed event
GatewayServer->>Client: replay {event: {...}, seq: N}
end
Estimated code review effortπ― 3 (Moderate) | β±οΈ ~20 minutes Possibly related issues
Poem
π₯ Pre-merge checks | β 5β Passed checks (5 passed)
βοΈ Tip: You can configure your own custom pre-merge checks in the settings. β¨ Finishing Touchesπ§ͺ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first β incorporate their findings. Review areas:
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and canβt be posted inline due to platform limitations.
β οΈ Outside diff range comments (1)
src/praisonai/praisonai/gateway/server.py (1)
1217-1232:β οΈ Potential issue | π Major | β‘ Quick win
seqis still missing on live token/tool stream events.The tracking allowlist at Line 1217 excludes
token_streamandtool_call_stream, so those outbound events bypass cursor/seqstamping even though they are part of live delivery. That breaks client-side gap detection coverage and diverges from the documented wire contract.Suggested fix
- if data.get("type") in ["response", "message", "stream_end", "error"]: + if data.get("type") in [ + "response", + "message", + "stream_end", + "error", + "token_stream", + "tool_call_stream", + ]:π€ Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/praisonai/praisonai/gateway/server.py` around lines 1217 - 1232, The type allowlist at the beginning of the conditional (checking if data.get("type") is in the list containing "response", "message", "stream_end", "error") is excluding the live stream events token_stream and tool_call_stream, preventing them from receiving the cursor and seq stamping that happens within this block. Add "token_stream" and "tool_call_stream" to the allowlist so that these outbound events receive the same cursor and seq values as other event types, ensuring consistent gap detection coverage across all event types.
π€ Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/gateway/server.py`:
- Around line 1001-1003: The `since` parameter is client-controlled JSON input
that may arrive as a string or non-numeric value, causing cursor comparison
operations in `check_resync_required` and `get_oldest_cursor` to raise
exceptions and abort the join path instead of returning a protocol error. Parse
and normalize `since` to an integer at the request ingress point before it is
used in any cursor comparison logic, and explicitly return a protocol error if
the value is invalid or cannot be converted to an integer.
---
Outside diff comments:
In `@src/praisonai/praisonai/gateway/server.py`:
- Around line 1217-1232: The type allowlist at the beginning of the conditional
(checking if data.get("type") is in the list containing "response", "message",
"stream_end", "error") is excluding the live stream events token_stream and
tool_call_stream, preventing them from receiving the cursor and seq stamping
that happens within this block. Add "token_stream" and "tool_call_stream" to the
allowlist so that these outbound events receive the same cursor and seq values
as other event types, ensuring consistent gap detection coverage across all
event types.
πͺ Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
βΉοΈ Review info
βοΈ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b5113d86-5ea2-4c95-a091-f13d9b018389
π Files selected for processing (2)
src/praisonai-agents/praisonaiagents/gateway/protocols.pysrc/praisonai/praisonai/gateway/server.py
| # Check if resync is required | ||
| resync_required = session.check_resync_required(since_cursor) | ||
| oldest_cursor = session.get_oldest_cursor() |
There was a problem hiding this comment.
Normalize since to an integer before replay/resync logic.
since is client-controlled JSON. If it arrives as a string/non-numeric value, the cursor comparisons used during replay/resync can raise and abort the join path instead of returning a protocol error (Line 1002 is part of that failure chain). Parse once at ingress and reject invalid cursors explicitly.
Suggested fix
- since_cursor = data.get("since") # Optional: cursor for event replay
+ since_raw = data.get("since") # Optional: cursor for event replay
+ since_cursor: Optional[int] = None
+ if since_raw is not None:
+ try:
+ since_cursor = int(since_raw)
+ except (TypeError, ValueError):
+ await self._send_to_client(client_id, {
+ "type": "error",
+ "message": "Invalid 'since' cursor. Send an integer from your last received cursor.",
+ })
+ returnπ€ Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 1001 - 1003, The
`since` parameter is client-controlled JSON input that may arrive as a string or
non-numeric value, causing cursor comparison operations in
`check_resync_required` and `get_oldest_cursor` to raise exceptions and abort
the join path instead of returning a protocol error. Parse and normalize `since`
to an integer at the request ingress point before it is used in any cursor
comparison logic, and explicitly return a protocol error if the value is invalid
or cannot be converted to an integer.
Greptile SummaryThis PR adds integrity checks to the gateway resume-after-disconnect feature: a resync-required flag with a snapshot fallback, an oldest-cursor floor in join acknowledgments, and a
Confidence Score: 3/5The resync/snapshot logic itself is wired up correctly, but expanding the buffered event types to include high-frequency streaming events counteracts the fix by accelerating eviction of the events that matter most for replay. The join handler correctly gates replay behind check_resync_required and sends a snapshot when needed. However, also buffering token_stream and tool_call_stream events β which can number in the hundreds per response β will fill the fixed-size replay buffer far faster than before. Important events (response, stream_end) added at the start of a session will be evicted after only a few streamed replies, making resync the common case rather than the exception. This undermines the core goal of the PR and warrants attention before merge. src/praisonai/praisonai/gateway/server.py β specifically the expanded buffered event type list around line 1233 and how it interacts with the buffer trimming in add_event() Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant C as Client
participant GW as WebSocketGateway
participant S as GatewaySession
C->>GW: "join {agent_id, session_id, since}"
GW->>S: resume_or_create_session(since_cursor)
S-->>GW: (session, replay_events)
GW->>S: check_resync_required(since_cursor)
S-->>GW: resync_required
GW->>S: get_oldest_cursor()
S-->>GW: oldest_cursor
GW-->>C: "joined {cursor, oldest_cursor, resync_required}"
alt "resync_required = true"
GW->>S: get_snapshot()
S-->>GW: snapshot
GW-->>C: "snapshot {state}"
else "resync_required = false"
loop for each replay_event
GW-->>C: "replay {event, seq}"
end
end
Note over GW,C: Live event flow
GW->>S: add_event(event)
S-->>GW: cursor
GW-->>C: "event {cursor, seq}"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant C as Client
participant GW as WebSocketGateway
participant S as GatewaySession
C->>GW: "join {agent_id, session_id, since}"
GW->>S: resume_or_create_session(since_cursor)
S-->>GW: (session, replay_events)
GW->>S: check_resync_required(since_cursor)
S-->>GW: resync_required
GW->>S: get_oldest_cursor()
S-->>GW: oldest_cursor
GW-->>C: "joined {cursor, oldest_cursor, resync_required}"
alt "resync_required = true"
GW->>S: get_snapshot()
S-->>GW: snapshot
GW-->>C: "snapshot {state}"
else "resync_required = false"
loop for each replay_event
GW-->>C: "replay {event, seq}"
end
end
Note over GW,C: Live event flow
GW->>S: add_event(event)
S-->>GW: cursor
GW-->>C: "event {cursor, seq}"
Reviews (2): Last reviewed commit: "fix: address reviewer feedback on gatewa..." | Re-trigger Greptile |
| def get_snapshot(self) -> Dict[str, Any]: | ||
| """Get a snapshot of the current session state for resync.""" | ||
| return { | ||
| "session_id": self._session_id, | ||
| "agent_id": self._agent_id, | ||
| "state": dict(self._state), | ||
| "messages": [{ | ||
| "content": msg.content, | ||
| "sender_id": msg.sender_id, | ||
| "session_id": msg.session_id, | ||
| "message_id": msg.message_id, | ||
| "timestamp": msg.timestamp, | ||
| "metadata": msg.metadata, | ||
| } for msg in self._messages], | ||
| "event_cursor": self._event_cursor, | ||
| } |
There was a problem hiding this comment.
get_snapshot() omits retained events from the buffer
The snapshot sent on resync includes messages, state, and event_cursor, but not the events still in the buffer (from oldest_cursor to event_cursor). Compare with to_dict(), which includes the last 100 events. A client receiving the snapshot cannot reconstruct the event flow (e.g., response, stream_end) that occurred between its last known cursor and the snapshot head. The PR description calls this "full state recovery" β adding an events key with the currently-retained buffer events (as to_dict() does) would make the claim accurate and help clients that track event-level data.
| data["cursor"] = cursor | ||
| # Add top-level sequence number for integrity checking | ||
| data["seq"] = cursor |
There was a problem hiding this comment.
seq and cursor are always set to the same value (cursor = session.add_event(event)). The existing cursor field already provides the monotonic sequence number for gap detection. Adding seq as a separate top-level alias is redundant β any client that already reads cursor gets the same information. If the intent is to expose the sequence number at the envelope level (outside data), the comment should clarify why cursor in data is insufficient.
| data["cursor"] = cursor | |
| # Add top-level sequence number for integrity checking | |
| data["seq"] = cursor | |
| data["cursor"] = cursor | |
| # seq mirrors cursor at the envelope level so clients | |
| # can detect gaps without parsing data['cursor'] | |
| data["seq"] = cursor |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task ββ View job β Architecture Review CompleteReview Summary:
Phase 1: Architecture Compliance β
Phase 2: Fixes Applied βI've addressed the valid issues identified by reviewers: Fixed Issues (commit 618d40a):
Skipped Non-Issues:
Phase 3: Final Verdict βAPPROVED - The PR successfully implements gateway integrity checks with minimal, focused changes that follow the architecture guidelines. All valid reviewer feedback has been addressed. Modified Files:
The fix correctly handles the silent event loss issue while maintaining backward compatibility and following protocol-driven design principles. |
- Add input validation for 'since' cursor to prevent type errors - Include token_stream and tool_call_stream events in seq stamping - Clarify get_oldest_cursor() semantics with documentation - Document resume_or_create_session() replay_events caveat Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
Fixes #2153
Summary
The gateway's resume-after-disconnect feature could silently lose events when the event buffer was trimmed. Clients had no way to detect gaps in the event stream, leading to silent data loss.
Changes
Technical Details
Testing
Verified the fix handles:
Generated with Claude Code
Summary by CodeRabbit
New Features
Documentation