fix: add protocol version negotiation and resilient reconnection to gateway#2131
fix: add protocol version negotiation and resilient reconnection to gateway#2131praisonai-triage-agent[bot] wants to merge 2 commits into
Conversation
…ateway (fixes #2130) - Core SDK: Added protocol version constants and negotiation types - Core SDK: Added sequence numbers to GatewayEvent for gap detection - Server: Implemented version negotiation in join handshake - Server: Added sequence numbering and gap detection support - Server: Enhanced resume with presence/health snapshot - Client: Created reconnecting Python client with exponential backoff - Client: Implemented gap detection and cursor-based resumption Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThe PR adds protocol version negotiation and event sequence tracking to the PraisonAI gateway. ChangesGateway Protocol Negotiation & Reconnecting Client
Sequence Diagram(s)sequenceDiagram
participant Integrator
participant GatewayClient
participant GatewayServer
participant GatewaySession
Integrator->>GatewayClient: connect(url, token)
GatewayClient->>GatewayServer: WebSocket open
GatewayClient->>GatewayServer: join {min_version=1, max_version=1, since=cursor}
GatewayServer->>GatewayServer: validate version ranges vs MIN/MAX_PROTOCOL_VERSION
alt incompatible versions
GatewayServer-->>GatewayClient: error version_unsupported {server_min, server_max}
GatewayClient-->>Integrator: raise ConnectionError
else negotiated
GatewayServer->>GatewaySession: _protocol_version = negotiated, replay events since cursor
GatewaySession-->>GatewayServer: presence snapshot, health, sequence
GatewayServer-->>GatewayClient: joined {protocol_version, sequence, presence, health}
GatewayClient->>GatewayClient: store _cursor, _expected_sequence
loop event stream
GatewayServer-->>GatewayClient: event {sequence, data}
GatewayClient->>GatewayClient: gap check: expected vs received sequence
alt gap detected
GatewayClient-->>Integrator: on_gap(expected, received)
end
GatewayClient-->>Integrator: events() yields GatewayEvent
end
end
Integrator->>GatewayClient: disconnect() / resync()
GatewayClient->>GatewayServer: close socket
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
Greptile SummaryThis PR adds protocol version negotiation between client and server and introduces a new
Confidence Score: 3/5The server changes are safe to merge, but the new GatewayClient has two logic bugs that would manifest in real usage: teardown after a protocol rejection crashes, and duplicate/out-of-order replayed events permanently corrupt the gap-detection counter. The server-side negotiation and the core SDK types are straightforward additions. All problems are in the new client.py. Calling disconnect() after any version-rejection attempt raises an unhandled ValueError from what is supposed to be a clean shutdown method. The sequence counter regression on out-of-order events means every event after a replay could be flagged as a gap, causing spurious on_gap callbacks and potentially triggering unnecessary resyncs in production. src/praisonai/praisonai/gateway/client.py — specifically the disconnect() task-await error handling and the _handle_event() sequence update logic. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant C as GatewayClient
participant S as WebSocketGateway
C->>S: WebSocket connect
C->>S: "join {agent_id, min_version, max_version, session_id?, since?}"
alt version incompatible
S-->>C: "error {code: version_unsupported}"
Note over C: ValueError raised, _running=False
else version compatible
S-->>C: "joined {session_id, cursor, sequence, protocol_version, presence[], health{}}"
loop replay missed events
S-->>C: "replay {event: {sequence, ...}}"
end
loop live events
S-->>C: "event {sequence, data, cursor}"
alt "sequence == expected"
Note over C: enqueue event
else gap detected
Note over C: fire on_gap(expected, received)
end
end
end
alt reconnect needed
Note over C: exponential backoff sleep
C->>S: "join {session_id, since: cursor, ...}"
end
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant C as GatewayClient
participant S as WebSocketGateway
C->>S: WebSocket connect
C->>S: "join {agent_id, min_version, max_version, session_id?, since?}"
alt version incompatible
S-->>C: "error {code: version_unsupported}"
Note over C: ValueError raised, _running=False
else version compatible
S-->>C: "joined {session_id, cursor, sequence, protocol_version, presence[], health{}}"
loop replay missed events
S-->>C: "replay {event: {sequence, ...}}"
end
loop live events
S-->>C: "event {sequence, data, cursor}"
alt "sequence == expected"
Note over C: enqueue event
else gap detected
Note over C: fire on_gap(expected, received)
end
end
end
alt reconnect needed
Note over C: exponential backoff sleep
C->>S: "join {session_id, since: cursor, ...}"
end
Reviews (2): Last reviewed commit: "fix: address reviewer feedback on gatewa..." | Re-trigger Greptile |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/gateway/client.py`:
- Around line 166-175: The connect() method's exception handler treats version
mismatch errors (ValueError raised by _connect_once()) the same as transient
connection errors, causing indefinite reconnection attempts for a non-retriable
failure. Add a specific exception handler before the generic Exception handler
to catch ValueError separately and immediately break the reconnection loop when
a version mismatch is detected, rather than continuing to retry. This way,
ValueError from _connect_once() will be treated as a terminal error while other
transient exceptions can still trigger reconnection attempts as intended.
- Around line 215-217: The join handshake in the WebSocket client is missing a
timeout when waiting for the server response via the self._ws.recv() call, which
can cause the code to block indefinitely. Add a timeout wrapper (such as
asyncio.wait_for() with an appropriate timeout value) around the self._ws.recv()
call to ensure that the join handshake fails gracefully if the server does not
complete the handshake within a reasonable timeframe.
- Around line 297-299: The cursor update logic uses a truthiness check with `if
cursor:` which incorrectly skips valid zero values since 0 evaluates to False in
Python. Change the condition to explicitly check if cursor is not None using `if
cursor is not None:` instead. This will properly distinguish between a missing
cursor value (None) and a legitimate zero cursor value, allowing the
self._cursor assignment to proceed for all valid cursor values including 0.
In `@src/praisonai/praisonai/gateway/server.py`:
- Around line 1008-1015: The joined message is sending the current
session._sequence value, but replayed events have older sequence numbers,
causing clients to detect false gaps. Adjust the sequence value sent in the
joined message payload to match the sequence of the first replayed event (or one
less than that) instead of using session._sequence directly. This ensures the
client's expected_sequence aligns with the replayed event stream and prevents
gap detection on valid replayed events. Apply this fix to all occurrences in the
method where sequence is being sent to clients (including the range around lines
1022-1027).
- Around line 964-981: The protocol version validation is missing type checking
for the min_version and max_version fields extracted from untrusted JSON data.
Before performing the range comparison using < and > operators on
client_min_version and client_max_version, add validation to ensure these values
are integers. If either value is not an integer or is missing, send the
version_unsupported error response instead of allowing an exception to be raised
during the comparison operation.
- Around line 57-58: The `_sequence` and `_protocol_version` attributes in the
gateway server class are not being serialized when persisting session state and
not being restored when resuming sessions, causing sequence numbering to restart
on reconnects. Identify the session serialization method (likely serialize() or
a similar persistence method) and add both `_sequence` and `_protocol_version`
to the serialized session data. Then locate the corresponding
deserialization/resume method and restore these attributes from the persisted
data, ensuring they maintain their values across reconnects to preserve
monotonic ordering.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c0df376f-fe30-4d23-a439-a7447313626b
📒 Files selected for processing (5)
src/praisonai-agents/praisonaiagents/gateway/__init__.pysrc/praisonai-agents/praisonaiagents/gateway/protocols.pysrc/praisonai/praisonai/gateway/__init__.pysrc/praisonai/praisonai/gateway/client.pysrc/praisonai/praisonai/gateway/server.py
| except Exception as e: | ||
| logger.error(f"Connection error: {e}") | ||
|
|
||
| if not self._running or not self.reconnect: | ||
| break | ||
|
|
||
| # Check max attempts | ||
| if self.max_reconnect_attempts and self._reconnect_attempts >= self.max_reconnect_attempts: | ||
| logger.error(f"Max reconnection attempts ({self.max_reconnect_attempts}) reached") | ||
| break |
There was a problem hiding this comment.
Treat version_unsupported as terminal, not reconnectable.
_connect_once() raises ValueError for version mismatch, but connect() catches it and keeps retrying indefinitely, creating a permanent reconnect loop for non-transient failures.
Proposed fix
- while self._running:
+ while self._running:
try:
await self._connect_once()
@@
- except Exception as e:
+ except ValueError as e:
+ logger.error(f"Connection error: {e}")
+ self._running = False
+ raise
+ except Exception as e:
logger.error(f"Connection error: {e}")Also applies to: 223-225
🧰 Tools
🪛 Ruff (0.15.17)
[warning] 166-166: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/client.py` around lines 166 - 175, The
connect() method's exception handler treats version mismatch errors (ValueError
raised by _connect_once()) the same as transient connection errors, causing
indefinite reconnection attempts for a non-retriable failure. Add a specific
exception handler before the generic Exception handler to catch ValueError
separately and immediately break the reconnection loop when a version mismatch
is detected, rather than continuing to retry. This way, ValueError from
_connect_once() will be treated as a terminal error while other transient
exceptions can still trigger reconnection attempts as intended.
| # Wait for join response | ||
| response = await self._ws.recv() | ||
| data = json.loads(response) |
There was a problem hiding this comment.
Add timeout to join handshake receive.
Waiting on self._ws.recv() without a timeout can block forever if the server accepts TCP/WebSocket but never completes join response.
Proposed fix
- response = await self._ws.recv()
+ try:
+ response = await asyncio.wait_for(self._ws.recv(), timeout=10.0)
+ except asyncio.TimeoutError as err:
+ raise ConnectionError("Join handshake timed out") from err📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Wait for join response | |
| response = await self._ws.recv() | |
| data = json.loads(response) | |
| # Wait for join response | |
| try: | |
| response = await asyncio.wait_for(self._ws.recv(), timeout=10.0) | |
| except asyncio.TimeoutError as err: | |
| raise ConnectionError("Join handshake timed out") from err | |
| data = json.loads(response) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/client.py` around lines 215 - 217, The join
handshake in the WebSocket client is missing a timeout when waiting for the
server response via the self._ws.recv() call, which can cause the code to block
indefinitely. Add a timeout wrapper (such as asyncio.wait_for() with an
appropriate timeout value) around the self._ws.recv() call to ensure that the
join handshake fails gracefully if the server does not complete the handshake
within a reasonable timeframe.
| cursor = event.data.get("cursor") | ||
| if cursor: | ||
| self._cursor = cursor |
There was a problem hiding this comment.
Cursor update skips valid zero values.
if cursor: ignores 0, so cursor resets or zero-valued cursors are dropped.
Proposed fix
- cursor = event.data.get("cursor")
- if cursor:
+ cursor = event.data.get("cursor")
+ if cursor is not None:
self._cursor = cursor📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cursor = event.data.get("cursor") | |
| if cursor: | |
| self._cursor = cursor | |
| cursor = event.data.get("cursor") | |
| if cursor is not None: | |
| self._cursor = cursor |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/client.py` around lines 297 - 299, The cursor
update logic uses a truthiness check with `if cursor:` which incorrectly skips
valid zero values since 0 evaluates to False in Python. Change the condition to
explicitly check if cursor is not None using `if cursor is not None:` instead.
This will properly distinguish between a missing cursor value (None) and a
legitimate zero cursor value, allowing the self._cursor assignment to proceed
for all valid cursor values including 0.
| _sequence: int = 0 # Monotonic sequence number for gap detection | ||
| _protocol_version: int = PROTOCOL_VERSION # Negotiated protocol version |
There was a problem hiding this comment.
Persist session sequence/protocol metadata when resuming sessions.
_sequence and _protocol_version are introduced but never serialized/restored, so resumed sessions can restart sequence numbering and violate monotonic ordering across reconnects.
Proposed fix
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self._session_id,
@@
"event_cursor": self._event_cursor,
+ "sequence": self._sequence,
+ "protocol_version": self._protocol_version,
"events": [e.to_dict() for e in self._events[-100:]], # Keep last 100 events
}
`@classmethod`
def from_dict(cls, data: Dict[str, Any], max_messages: int = 1000) -> 'GatewaySession':
@@
session._event_cursor = data.get("event_cursor", 0)
+ session._sequence = data.get("sequence", session._event_cursor)
+ session._protocol_version = data.get("protocol_version", PROTOCOL_VERSION)
for event_data in data.get("events", []):
event = GatewayEvent.from_dict(event_data)
session._events.append(event)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| _sequence: int = 0 # Monotonic sequence number for gap detection | |
| _protocol_version: int = PROTOCOL_VERSION # Negotiated protocol version | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "session_id": self._session_id, | |
| "event_cursor": self._event_cursor, | |
| "sequence": self._sequence, | |
| "protocol_version": self._protocol_version, | |
| "events": [e.to_dict() for e in self._events[-100:]], # Keep last 100 events | |
| } | |
| `@classmethod` | |
| def from_dict(cls, data: Dict[str, Any], max_messages: int = 1000) -> 'GatewaySession': | |
| session = cls() | |
| session._session_id = data.get("session_id", str(uuid.uuid4())) | |
| session._event_cursor = data.get("event_cursor", 0) | |
| session._sequence = data.get("sequence", session._event_cursor) | |
| session._protocol_version = data.get("protocol_version", PROTOCOL_VERSION) | |
| for event_data in data.get("events", []): | |
| event = GatewayEvent.from_dict(event_data) | |
| session._events.append(event) | |
| return session |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 57 - 58, The
`_sequence` and `_protocol_version` attributes in the gateway server class are
not being serialized when persisting session state and not being restored when
resuming sessions, causing sequence numbering to restart on reconnects. Identify
the session serialization method (likely serialize() or a similar persistence
method) and add both `_sequence` and `_protocol_version` to the serialized
session data. Then locate the corresponding deserialization/resume method and
restore these attributes from the persisted data, ensuring they maintain their
values across reconnects to preserve monotonic ordering.
| # Protocol version negotiation | ||
| client_min_version = data.get("min_version", MIN_PROTOCOL_VERSION) | ||
| client_max_version = data.get("max_version", PROTOCOL_VERSION) | ||
|
|
||
| # Check if we can negotiate a common version | ||
| if client_max_version < MIN_PROTOCOL_VERSION or client_min_version > MAX_PROTOCOL_VERSION: | ||
| await self._send_to_client(client_id, { | ||
| "type": "error", | ||
| "code": "version_unsupported", | ||
| "message": f"Protocol version mismatch. Server supports {MIN_PROTOCOL_VERSION}-{MAX_PROTOCOL_VERSION}, client supports {client_min_version}-{client_max_version}", | ||
| "server_min_version": MIN_PROTOCOL_VERSION, | ||
| "server_max_version": MAX_PROTOCOL_VERSION, | ||
| }) | ||
| return | ||
|
|
||
| # Negotiate the highest common version | ||
| negotiated_version = min(client_max_version, MAX_PROTOCOL_VERSION) | ||
|
|
There was a problem hiding this comment.
Validate protocol version fields before range comparison.
min_version/max_version come from untrusted JSON. Non-integer values will raise during </> checks and drop the join flow via exception instead of returning a typed protocol error.
Proposed fix
- client_min_version = data.get("min_version", MIN_PROTOCOL_VERSION)
- client_max_version = data.get("max_version", PROTOCOL_VERSION)
+ try:
+ client_min_version = int(data.get("min_version", MIN_PROTOCOL_VERSION))
+ client_max_version = int(data.get("max_version", PROTOCOL_VERSION))
+ except (TypeError, ValueError):
+ await self._send_to_client(client_id, {
+ "type": "error",
+ "code": "invalid_protocol_hello",
+ "message": "Invalid protocol version fields. Expected integer min_version/max_version.",
+ })
+ return
+
+ if client_min_version > client_max_version:
+ await self._send_to_client(client_id, {
+ "type": "error",
+ "code": "invalid_protocol_hello",
+ "message": f"Invalid version range: min_version ({client_min_version}) > max_version ({client_max_version})",
+ })
+ return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Protocol version negotiation | |
| client_min_version = data.get("min_version", MIN_PROTOCOL_VERSION) | |
| client_max_version = data.get("max_version", PROTOCOL_VERSION) | |
| # Check if we can negotiate a common version | |
| if client_max_version < MIN_PROTOCOL_VERSION or client_min_version > MAX_PROTOCOL_VERSION: | |
| await self._send_to_client(client_id, { | |
| "type": "error", | |
| "code": "version_unsupported", | |
| "message": f"Protocol version mismatch. Server supports {MIN_PROTOCOL_VERSION}-{MAX_PROTOCOL_VERSION}, client supports {client_min_version}-{client_max_version}", | |
| "server_min_version": MIN_PROTOCOL_VERSION, | |
| "server_max_version": MAX_PROTOCOL_VERSION, | |
| }) | |
| return | |
| # Negotiate the highest common version | |
| negotiated_version = min(client_max_version, MAX_PROTOCOL_VERSION) | |
| # Protocol version negotiation | |
| try: | |
| client_min_version = int(data.get("min_version", MIN_PROTOCOL_VERSION)) | |
| client_max_version = int(data.get("max_version", PROTOCOL_VERSION)) | |
| except (TypeError, ValueError): | |
| await self._send_to_client(client_id, { | |
| "type": "error", | |
| "code": "invalid_protocol_hello", | |
| "message": "Invalid protocol version fields. Expected integer min_version/max_version.", | |
| }) | |
| return | |
| if client_min_version > client_max_version: | |
| await self._send_to_client(client_id, { | |
| "type": "error", | |
| "code": "invalid_protocol_hello", | |
| "message": f"Invalid version range: min_version ({client_min_version}) > max_version ({client_max_version})", | |
| }) | |
| return | |
| # Check if we can negotiate a common version | |
| if client_max_version < MIN_PROTOCOL_VERSION or client_min_version > MAX_PROTOCOL_VERSION: | |
| await self._send_to_client(client_id, { | |
| "type": "error", | |
| "code": "version_unsupported", | |
| "message": f"Protocol version mismatch. Server supports {MIN_PROTOCOL_VERSION}-{MAX_PROTOCOL_VERSION}, client supports {client_min_version}-{client_max_version}", | |
| "server_min_version": MIN_PROTOCOL_VERSION, | |
| "server_max_version": MAX_PROTOCOL_VERSION, | |
| }) | |
| return | |
| # Negotiate the highest common version | |
| negotiated_version = min(client_max_version, MAX_PROTOCOL_VERSION) | |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 964 - 981, The
protocol version validation is missing type checking for the min_version and
max_version fields extracted from untrusted JSON data. Before performing the
range comparison using < and > operators on client_min_version and
client_max_version, add validation to ensure these values are integers. If
either value is not an integer or is missing, send the version_unsupported error
response instead of allowing an exception to be raised during the comparison
operation.
| await self._send_to_client(client_id, { | ||
| "type": "joined", | ||
| "session_id": session.session_id, | ||
| "agent_id": agent_id, | ||
| "resumed": session._was_resumed, | ||
| "cursor": session._event_cursor, | ||
| "sequence": session._sequence, # Current sequence for gap detection | ||
| "protocol_version": negotiated_version, |
There was a problem hiding this comment.
joined.sequence is ahead of replay events, causing false gap detection.
You send joined.sequence = session._sequence and then replay older events. Clients initialize expected_sequence from joined and will report gaps on every replayed event.
Proposed fix
- "sequence": session._sequence, # Current sequence for gap detection
+ "sequence": (replay_events[0].sequence - 1) if replay_events and replay_events[0].sequence is not None else session._sequence,Also applies to: 1022-1027
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 1008 - 1015, The
joined message is sending the current session._sequence value, but replayed
events have older sequence numbers, causing clients to detect false gaps. Adjust
the sequence value sent in the joined message payload to match the sequence of
the first replayed event (or one less than that) instead of using
session._sequence directly. This ensures the client's expected_sequence aligns
with the replayed event stream and prevents gap detection on valid replayed
events. Apply this fix to all occurrences in the method where sequence is being
sent to clients (including the range around lines 1022-1027).
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job ✅ Review Complete - All Issues FixedArchitecture Compliance ✅
Fixed Issues (commit 7807949)
Files Modified
VerdictPR is ready to merge ✅ All critical issues identified by reviewers have been resolved. The implementation follows protocol-driven design principles, maintains backward compatibility, and introduces no performance regressions. |
- Fix GatewayClient.connect() to be non-blocking by running connection loop in background task - Add ValueError catch for version_unsupported to prevent infinite retry on permanent errors - Fix cursor update logic to handle zero values correctly (use 'is not None' check) - Add session persistence for sequence and protocol_version fields - Add protocol version validation with proper type checking for untrusted JSON - Fix sequence number in joined message to align with replay events to prevent false gap detection - Fix off-by-one error in reconnect attempts counter - Add timeout to WebSocket join handshake to prevent indefinite blocking - Remove unused imports (time, field, GatewayMessage) Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
| if hasattr(self, '_connect_task') and self._connect_task: | ||
| self._connect_task.cancel() | ||
| try: | ||
| await self._connect_task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
disconnect() re-raises the stored task exception, not just CancelledError
When the connection loop exits due to a permanent protocol version mismatch, _connect_task stores the ValueError as its result. Calling disconnect() later runs await self._connect_task, which re-raises that stored ValueError — but only asyncio.CancelledError is caught. Any caller of disconnect() will unexpectedly see a ValueError raised from a disconnect call, making clean teardown impossible after a negotiation failure.
Fixes #2130
Summary
This PR implements protocol version negotiation and resilient reconnection capabilities for the gateway, addressing the lack of version negotiation and the absence of a reconnecting client.
Changes
Core SDK (praisonaiagents/gateway/protocols.py)
Wrapper Server (praisonai/gateway/server.py)
Wrapper Client (praisonai/gateway/client.py)
Key Features
Testing
The implementation follows the protocol-driven design with contracts in core SDK and heavy implementations in wrapper. All changes are backward compatible - existing clients without version negotiation continue to work.
Summary by CodeRabbit