fix: add versioned handshake with capability negotiation to gateway protocol#2154
fix: add versioned handshake with capability negotiation to gateway protocol#2154praisonai-triage-agent[bot] wants to merge 2 commits into
Conversation
…rotocol (fixes #2151) - Add protocol version constants and negotiation - Add structured connect error codes (auth_required, protocol_unsupported, etc.) - Add HelloParams, HelloResult, HelloError data structures - Implement hello/hello_ok/hello_error message exchange - Support capability-based feature discovery - Communicate policy limits (max_payload, heartbeat_ms) - Maintain backward compatibility with existing join/joined messages Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds protocol version constants ( ChangesGateway Versioned Handshake
Sequence Diagram(s)sequenceDiagram
participant Client
participant GatewayServer
participant SessionStore
rect rgba(100, 149, 237, 0.5)
Note over Client,SessionStore: New versioned hello path
Client->>GatewayServer: hello {agent_id, protocol{min,max}, caps, session_id?, since_cursor?}
GatewayServer->>GatewayServer: validate agent_id and negotiate protocol version
alt validation failure
GatewayServer->>Client: hello_error {code, message, next_action?}
else success
GatewayServer->>SessionStore: resume_or_create_session(agent_id, session_id, since_cursor)
SessionStore-->>GatewayServer: session, replay_events[]
GatewayServer->>Client: hello_ok {protocol, features, policy, session_id, resumed, cursor}
loop missed events
GatewayServer->>Client: replay {event}
end
end
end
rect rgba(144, 238, 144, 0.5)
Note over Client,GatewayServer: Legacy join path (backward compat)
Client->>GatewayServer: join {agent_id, session_id?, since?}
GatewayServer->>Client: joined {session_id, agent_id, resumed, cursor}
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/gateway/server.py`:
- Around line 1064-1069: The heartbeat_ms value in the policy dictionary is
hard-coded to 15000 instead of using the heartbeat_interval value loaded from
the gateway configuration. Replace the hard-coded 15000 value with the
heartbeat_interval from the gateway config so that the policy reflects the
actual configured heartbeat interval and clients negotiate the correct interval
instead of defaulting to 15 seconds.
- Around line 1037-1063: The features list in the code starting around line 1039
is advertising methods and events that are either not implemented or not
negotiated with the client. Remove the "abort" method from the methods list
unless it has a corresponding handler implemented, and move the streaming events
(TOKEN_STREAM, TOOL_CALL_STREAM, STREAM_END, MESSAGE, MESSAGE_ACK, ERROR) from
the base features dictionary into a conditional block that only adds them if the
client has advertised streaming capability in client_caps, similar to how the
presence and ack features are conditionally added. This ensures that
hello_ok.features only advertises methods and events that are both implemented
and negotiated with the client.
- Around line 981-1024: The wire parser is reading incorrect field names from
the incoming data, causing clients that properly serialize HelloParams to fall
back to protocol v1 with no capabilities. Update the parsing logic around line
982-983 to read protocol_min and protocol_max as direct fields (not nested under
protocol), and update line 1023 to read capabilities instead of caps, so that
client data aligned with the HelloParams contract is correctly parsed.
- Around line 1025-1035: The resumed session returned from
resume_or_create_session() may belong to a different agent_id or contain a stale
client_id, which causes incorrect message routing later since responses use
session.client_id. Before storing the session in self._client_sessions, validate
that the returned session's agent_id matches the requested agent_id parameter,
and rebind the current client_id to the session to ensure the correct websocket
receives the replayed messages and subsequent responses.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a0e9ba15-15fa-47dd-8498-8c6387b7d0d8
📒 Files selected for processing (2)
src/praisonai-agents/praisonaiagents/gateway/protocols.pysrc/praisonai/praisonai/gateway/server.py
| # Parse protocol version from client | ||
| protocol_info = data.get("protocol", {}) | ||
| if isinstance(protocol_info, dict): | ||
| client_min = protocol_info.get("min", 1) | ||
| client_max = protocol_info.get("max", 1) | ||
| else: | ||
| # Backwards compatibility: treat missing protocol as v1 | ||
| client_min = client_max = 1 | ||
|
|
||
| # Negotiate protocol version | ||
| if client_max < MIN_CLIENT_PROTOCOL_VERSION: | ||
| error = HelloError( | ||
| code=ConnectErrorCode.PROTOCOL_UNSUPPORTED, | ||
| message=f"Protocol version {client_max} is too old, minimum required is {MIN_CLIENT_PROTOCOL_VERSION}", | ||
| next_action="upgrade_client" | ||
| ) | ||
| await self._send_to_client(client_id, { | ||
| "type": "hello_error", | ||
| "code": error.code.value, | ||
| "message": error.message, | ||
| "next": error.next_action, | ||
| }) | ||
| return | ||
|
|
||
| if client_min > GATEWAY_PROTOCOL_VERSION: | ||
| error = HelloError( | ||
| code=ConnectErrorCode.PROTOCOL_UNSUPPORTED, | ||
| message=f"Protocol version {client_min} is too new, server supports up to {GATEWAY_PROTOCOL_VERSION}", | ||
| next_action="use_older_client" | ||
| ) | ||
| await self._send_to_client(client_id, { | ||
| "type": "hello_error", | ||
| "code": error.code.value, | ||
| "message": error.message, | ||
| "next": error.next_action, | ||
| }) | ||
| return | ||
|
|
||
| # Select the highest mutually supported version | ||
| negotiated_version = min(client_max, GATEWAY_PROTOCOL_VERSION) | ||
|
|
||
| # Get client capabilities | ||
| client_caps = data.get("caps", []) | ||
|
|
There was a problem hiding this comment.
Align the wire parser with HelloParams.
Line 982 reads protocol.{min,max} and Line 1023 reads caps, but the shared contract defines protocol_min, protocol_max, and capabilities. SDK clients serializing HelloParams will be treated as protocol v1 with no capabilities.
🔧 Proposed contract-compatible parser
- protocol_info = data.get("protocol", {})
- if isinstance(protocol_info, dict):
- client_min = protocol_info.get("min", 1)
- client_max = protocol_info.get("max", 1)
- else:
- # Backwards compatibility: treat missing protocol as v1
- client_min = client_max = 1
+ protocol_info = data.get("protocol")
+ if "protocol_min" in data or "protocol_max" in data:
+ client_min = data.get("protocol_min", 1)
+ client_max = data.get("protocol_max", 1)
+ elif isinstance(protocol_info, dict):
+ client_min = protocol_info.get("min", 1)
+ client_max = protocol_info.get("max", 1)
+ else:
+ # Backwards compatibility: treat missing protocol as v1
+ client_min = client_max = 1
+
+ try:
+ client_min = int(client_min)
+ client_max = int(client_max)
+ except (TypeError, ValueError):
+ error = HelloError(
+ code=ConnectErrorCode.PROTOCOL_UNSUPPORTED,
+ message="Protocol versions must be integers",
+ next_action="send_integer_protocol_min_and_protocol_max",
+ )
+ await self._send_to_client(client_id, {
+ "type": "hello_error",
+ "code": error.code.value,
+ "message": error.message,
+ "next": error.next_action,
+ })
+ return
+
+ if client_min > client_max:
+ error = HelloError(
+ code=ConnectErrorCode.PROTOCOL_UNSUPPORTED,
+ message="protocol_min cannot be greater than protocol_max",
+ next_action="fix_protocol_version_range",
+ )
+ await self._send_to_client(client_id, {
+ "type": "hello_error",
+ "code": error.code.value,
+ "message": error.message,
+ "next": error.next_action,
+ })
+ return
@@
- client_caps = data.get("caps", [])
+ client_caps = data.get("capabilities", data.get("caps", []))
+ if not isinstance(client_caps, list):
+ client_caps = []🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 981 - 1024, The wire
parser is reading incorrect field names from the incoming data, causing clients
that properly serialize HelloParams to fall back to protocol v1 with no
capabilities. Update the parsing logic around line 982-983 to read protocol_min
and protocol_max as direct fields (not nested under protocol), and update line
1023 to read capabilities instead of caps, so that client data aligned with the
HelloParams contract is correctly parsed.
| # Resume or create session | ||
| session_id = data.get("session_id") | ||
| since_cursor = data.get("since") | ||
| session, replay_events = self.resume_or_create_session( | ||
| session_id=session_id, | ||
| agent_id=agent_id, | ||
| client_id=client_id, | ||
| since_cursor=since_cursor, | ||
| ) | ||
|
|
||
| self._client_sessions[client_id] = session.session_id |
There was a problem hiding this comment.
Validate and rebind resumed sessions before attaching this client.
resume_or_create_session() may return an existing session for a different agent_id and with an old client_id; after Line 1035, later responses use session.client_id, so resumed messages can be routed to the wrong websocket or expose another agent session.
🔒 Suggested guard before replay/attachment
session, replay_events = self.resume_or_create_session(
session_id=session_id,
agent_id=agent_id,
client_id=client_id,
since_cursor=since_cursor,
)
+
+ if session.agent_id != agent_id:
+ error = HelloError(
+ code=ConnectErrorCode.AUTH_UNAUTHORIZED,
+ message="Session does not belong to the requested agent",
+ next_action="start_new_session",
+ )
+ await self._send_to_client(client_id, {
+ "type": "hello_error",
+ "code": error.code.value,
+ "message": error.message,
+ "next": error.next_action,
+ })
+ return
+
+ # Prefer moving this into `resume_or_create_session()` or a
+ # `GatewaySession.rebind_client()` helper so the legacy join path
+ # gets the same protection.
+ session._client_id = client_id
self._client_sessions[client_id] = session.session_id📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Resume or create session | |
| session_id = data.get("session_id") | |
| since_cursor = data.get("since") | |
| session, replay_events = self.resume_or_create_session( | |
| session_id=session_id, | |
| agent_id=agent_id, | |
| client_id=client_id, | |
| since_cursor=since_cursor, | |
| ) | |
| self._client_sessions[client_id] = session.session_id | |
| # Resume or create session | |
| session_id = data.get("session_id") | |
| since_cursor = data.get("since") | |
| session, replay_events = self.resume_or_create_session( | |
| session_id=session_id, | |
| agent_id=agent_id, | |
| client_id=client_id, | |
| since_cursor=since_cursor, | |
| ) | |
| if session.agent_id != agent_id: | |
| error = HelloError( | |
| code=ConnectErrorCode.AUTH_UNAUTHORIZED, | |
| message="Session does not belong to the requested agent", | |
| next_action="start_new_session", | |
| ) | |
| await self._send_to_client(client_id, { | |
| "type": "hello_error", | |
| "code": error.code.value, | |
| "message": error.message, | |
| "next": error.next_action, | |
| }) | |
| return | |
| # Prefer moving this into `resume_or_create_session()` or a | |
| # `GatewaySession.rebind_client()` helper so the legacy join path | |
| # gets the same protection. | |
| session._client_id = client_id | |
| self._client_sessions[client_id] = session.session_id |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 1025 - 1035, The
resumed session returned from resume_or_create_session() may belong to a
different agent_id or contain a stale client_id, which causes incorrect message
routing later since responses use session.client_id. Before storing the session
in self._client_sessions, validate that the returned session's agent_id matches
the requested agent_id parameter, and rebind the current client_id to the
session to ensure the correct websocket receives the replayed messages and
subsequent responses.
| # Build features list | ||
| features = { | ||
| "methods": ["message", "abort", "leave"], | ||
| "events": [ | ||
| EventType.TOKEN_STREAM.value, | ||
| EventType.TOOL_CALL_STREAM.value, | ||
| EventType.STREAM_END.value, | ||
| EventType.MESSAGE.value, | ||
| EventType.MESSAGE_ACK.value, | ||
| EventType.ERROR.value, | ||
| ], | ||
| } | ||
|
|
||
| # Add optional features based on client capabilities | ||
| if "presence" in client_caps and hasattr(self, '_presence_tracker') and self._presence_tracker: | ||
| features["events"].extend([ | ||
| EventType.PRESENCE_JOIN.value, | ||
| EventType.PRESENCE_LEAVE.value, | ||
| EventType.PRESENCE_UPDATE.value, | ||
| ]) | ||
|
|
||
| if "ack" in client_caps and hasattr(self, '_delivery_tracker') and self._delivery_tracker: | ||
| features["events"].extend([ | ||
| EventType.MESSAGE_NACK.value, | ||
| EventType.DELIVERY_RETRY.value, | ||
| ]) | ||
|
|
There was a problem hiding this comment.
Only advertise negotiated and implemented features.
Line 1039 advertises "abort", but this handler has no abort branch; streaming events are also advertised even when the client did not offer a streaming capability. That makes hello_ok.features unreliable for clients.
🔧 Proposed feature list tightening
features = {
- "methods": ["message", "abort", "leave"],
+ "methods": ["message", "leave"],
"events": [
- EventType.TOKEN_STREAM.value,
- EventType.TOOL_CALL_STREAM.value,
- EventType.STREAM_END.value,
EventType.MESSAGE.value,
- EventType.MESSAGE_ACK.value,
EventType.ERROR.value,
],
}
+
+ if "streaming" in client_caps:
+ features["events"].extend([
+ EventType.TOKEN_STREAM.value,
+ EventType.TOOL_CALL_STREAM.value,
+ EventType.STREAM_END.value,
+ ])
@@
if "ack" in client_caps and hasattr(self, '_delivery_tracker') and self._delivery_tracker:
features["events"].extend([
+ EventType.MESSAGE_ACK.value,
EventType.MESSAGE_NACK.value,
EventType.DELIVERY_RETRY.value,
])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Build features list | |
| features = { | |
| "methods": ["message", "abort", "leave"], | |
| "events": [ | |
| EventType.TOKEN_STREAM.value, | |
| EventType.TOOL_CALL_STREAM.value, | |
| EventType.STREAM_END.value, | |
| EventType.MESSAGE.value, | |
| EventType.MESSAGE_ACK.value, | |
| EventType.ERROR.value, | |
| ], | |
| } | |
| # Add optional features based on client capabilities | |
| if "presence" in client_caps and hasattr(self, '_presence_tracker') and self._presence_tracker: | |
| features["events"].extend([ | |
| EventType.PRESENCE_JOIN.value, | |
| EventType.PRESENCE_LEAVE.value, | |
| EventType.PRESENCE_UPDATE.value, | |
| ]) | |
| if "ack" in client_caps and hasattr(self, '_delivery_tracker') and self._delivery_tracker: | |
| features["events"].extend([ | |
| EventType.MESSAGE_NACK.value, | |
| EventType.DELIVERY_RETRY.value, | |
| ]) | |
| # Build features list | |
| features = { | |
| "methods": ["message", "leave"], | |
| "events": [ | |
| EventType.MESSAGE.value, | |
| EventType.ERROR.value, | |
| ], | |
| } | |
| if "streaming" in client_caps: | |
| features["events"].extend([ | |
| EventType.TOKEN_STREAM.value, | |
| EventType.TOOL_CALL_STREAM.value, | |
| EventType.STREAM_END.value, | |
| ]) | |
| # Add optional features based on client capabilities | |
| if "presence" in client_caps and hasattr(self, '_presence_tracker') and self._presence_tracker: | |
| features["events"].extend([ | |
| EventType.PRESENCE_JOIN.value, | |
| EventType.PRESENCE_LEAVE.value, | |
| EventType.PRESENCE_UPDATE.value, | |
| ]) | |
| if "ack" in client_caps and hasattr(self, '_delivery_tracker') and self._delivery_tracker: | |
| features["events"].extend([ | |
| EventType.MESSAGE_ACK.value, | |
| EventType.MESSAGE_NACK.value, | |
| EventType.DELIVERY_RETRY.value, | |
| ]) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 1037 - 1063, The
features list in the code starting around line 1039 is advertising methods and
events that are either not implemented or not negotiated with the client. Remove
the "abort" method from the methods list unless it has a corresponding handler
implemented, and move the streaming events (TOKEN_STREAM, TOOL_CALL_STREAM,
STREAM_END, MESSAGE, MESSAGE_ACK, ERROR) from the base features dictionary into
a conditional block that only adds them if the client has advertised streaming
capability in client_caps, similar to how the presence and ack features are
conditionally added. This ensures that hello_ok.features only advertises methods
and events that are both implemented and negotiated with the client.
| # Build policy limits | ||
| policy = { | ||
| "max_payload": 1048576, # 1MB | ||
| "max_buffered_bytes": 8388608, # 8MB | ||
| "heartbeat_ms": 15000, # 15 seconds | ||
| } |
There was a problem hiding this comment.
Report the configured heartbeat policy.
Line 1068 hard-codes heartbeat_ms to 15s even though gateway config loads heartbeat_interval; clients using the negotiated policy can self-configure to the wrong interval.
🔧 Proposed config-backed policy
policy = {
"max_payload": 1048576, # 1MB
"max_buffered_bytes": 8388608, # 8MB
- "heartbeat_ms": 15000, # 15 seconds
+ "heartbeat_ms": int(getattr(self.config, "heartbeat_interval", 15) * 1000),
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Build policy limits | |
| policy = { | |
| "max_payload": 1048576, # 1MB | |
| "max_buffered_bytes": 8388608, # 8MB | |
| "heartbeat_ms": 15000, # 15 seconds | |
| } | |
| # Build policy limits | |
| policy = { | |
| "max_payload": 1048576, # 1MB | |
| "max_buffered_bytes": 8388608, # 8MB | |
| "heartbeat_ms": int(getattr(self.config, "heartbeat_interval", 15) * 1000), | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/gateway/server.py` around lines 1064 - 1069, The
heartbeat_ms value in the policy dictionary is hard-coded to 15000 instead of
using the heartbeat_interval value loaded from the gateway configuration.
Replace the hard-coded 15000 value with the heartbeat_interval from the gateway
config so that the policy reflects the actual configured heartbeat interval and
clients negotiate the correct interval instead of defaulting to 15 seconds.
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job Architecture Review Complete ✅Phase 1: Review per AGENTS.md
Phase 2: Fixed IssuesFiles Modified:
Issues Fixed:
Issues Skipped: Phase 3: Final Verdict✅ PR Approved - All architectural and implementation issues have been resolved. The implementation correctly follows protocol-driven design with protocols in core SDK and heavy implementation in wrapper, maintains full backward compatibility, and introduces no performance regressions. The versioned handshake protocol is now ready for merge. --- | View changes |
- Fix caps: null causing TypeError by adding None check - Fix wire parser to support both HelloParams format (protocol_min/max, capabilities) and legacy format - Fix hard-coded policy limits to use config values (heartbeat_interval) - Remove unused HelloParams import - Validate session belongs to requested agent and rebind client_id - Only advertise implemented features (remove abort method, conditional streaming) Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
| if "protocol_min" in data or "protocol_max" in data: | ||
| # HelloParams format | ||
| client_min = data.get("protocol_min", 1) | ||
| client_max = data.get("protocol_max", 1) | ||
| else: | ||
| # Legacy format or missing | ||
| protocol_info = data.get("protocol", {}) | ||
| if isinstance(protocol_info, dict): | ||
| client_min = protocol_info.get("min", 1) | ||
| client_max = protocol_info.get("max", 1) | ||
| else: | ||
| # Backwards compatibility: treat missing protocol as v1 | ||
| client_min = client_max = 1 | ||
|
|
||
| # Negotiate protocol version | ||
| if client_max < MIN_CLIENT_PROTOCOL_VERSION: | ||
| error = HelloError( | ||
| code=ConnectErrorCode.PROTOCOL_UNSUPPORTED, | ||
| message=f"Protocol version {client_max} is too old, minimum required is {MIN_CLIENT_PROTOCOL_VERSION}", | ||
| next_action="upgrade_client" | ||
| ) | ||
| await self._send_to_client(client_id, { | ||
| "type": "hello_error", | ||
| "code": error.code.value, | ||
| "message": error.message, | ||
| "next": error.next_action, | ||
| }) | ||
| return |
There was a problem hiding this comment.
Null
protocol_min/protocol_max causes unhandled TypeError
data.get("protocol_min", 1) returns None — not 1 — when the client explicitly sends "protocol_min": null, because .get() only substitutes its default when the key is absent. The same applies to protocol_max. The first comparison client_max < MIN_CLIENT_PROTOCOL_VERSION then raises TypeError: '<' not supported between instances of 'NoneType' and 'int', which propagates out of the hello handler and is swallowed by the outer WebSocket loop, silently dropping the connection instead of sending a hello_error to the client. This is the same class of null-vs-absent bug that was fixed for caps on line 1033 but was not applied here. Add a type/null guard immediately after extracting client_min/client_max, defaulting or rejecting non-integer values before any comparison.
Fixes #2151
Summary
Adds a versioned handshake with capability negotiation to the WebSocket gateway protocol, enabling graceful protocol evolution and structured connect errors.
Changes
Core SDK (praisonaiagents/gateway/protocols.py)
Wrapper (praisonai/gateway/server.py)
Benefits
Summary by CodeRabbit