Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/praisonai-agents/praisonaiagents/gateway/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ class GatewayEvent:
timestamp: Event creation time
source: Source identifier (agent_id, client_id, etc.)
target: Target identifier (optional, for directed events)

Wire Protocol Extensions:
When events are sent over the gateway, additional fields are added:
- seq: Top-level monotonic sequence number for gap detection
- cursor: Event cursor position (also stored in data['cursor'])

Resume Protocol:
The 'joined' acknowledgment includes:
- cursor: Current head cursor position
- oldest_cursor: Oldest event still in buffer
- resync_required: True if requested 'since' is below oldest_cursor

When resync_required=true, a 'snapshot' message follows with full state.
"""

type: Union[EventType, str]
Expand Down
91 changes: 83 additions & 8 deletions src/praisonai/praisonai/gateway/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,45 @@ def add_event(self, event: GatewayEvent) -> int:
self._events = self._events[-self._max_messages:]
return self._event_cursor

def get_oldest_cursor(self) -> int:
"""Get the oldest event cursor still retained in the buffer.

When the buffer is empty, returns the current cursor position,
which correctly indicates that any cursor < _event_cursor would
require resync (since no events are retained).
"""
if self._events:
return self._events[0].data.get('cursor', self._event_cursor)
return self._event_cursor
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def get_events_since(self, cursor: int) -> List[GatewayEvent]:
"""Get events since the given cursor."""
return [e for e in self._events if e.data.get('cursor', 0) > cursor]

def check_resync_required(self, since_cursor: Optional[int]) -> bool:
"""Check if resync is required based on the requested cursor."""
if since_cursor is None:
return False
oldest_cursor = self.get_oldest_cursor()
return since_cursor < oldest_cursor

def get_snapshot(self) -> Dict[str, Any]:
"""Get a snapshot of the current session state for resync."""
return {
"session_id": self._session_id,
"agent_id": self._agent_id,
"state": dict(self._state),
"messages": [{
"content": msg.content,
"sender_id": msg.sender_id,
"session_id": msg.session_id,
"message_id": msg.message_id,
"timestamp": msg.timestamp,
"metadata": msg.metadata,
} for msg in self._messages],
"event_cursor": self._event_cursor,
}
Comment on lines +137 to +152

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 get_snapshot() omits retained events from the buffer

The snapshot sent on resync includes messages, state, and event_cursor, but not the events still in the buffer (from oldest_cursor to event_cursor). Compare with to_dict(), which includes the last 100 events. A client receiving the snapshot cannot reconstruct the event flow (e.g., response, stream_end) that occurred between its last known cursor and the snapshot head. The PR description calls this "full state recovery" — adding an events key with the currently-retained buffer events (as to_dict() does) would make the claim accurate and help clients that track event-level data.


def to_dict(self) -> Dict[str, Any]:
"""Serialize session to dictionary for persistence."""
return {
Expand Down Expand Up @@ -956,7 +991,18 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) ->
if agent_id and agent_id in self._agents:
# Support reconnection with existing session
session_id = data.get("session_id") # Optional: existing session to resume
since_cursor = data.get("since") # Optional: cursor for event replay
# Parse and validate the since parameter
since_raw = data.get("since") # Optional: cursor for event replay
since_cursor = None
if since_raw is not None:
try:
since_cursor = int(since_raw)
except (TypeError, ValueError):
await self._send_to_client(client_id, {
"type": "error",
"message": "Invalid 'since' cursor. Must be an integer.",
})
return

# Resume or create session
session, replay_events = self.resume_or_create_session(
Expand All @@ -968,21 +1014,39 @@ async def _handle_client_message(self, client_id: str, data: Dict[str, Any]) ->

self._client_sessions[client_id] = session.session_id

# Send join confirmation
# Check if resync is required
resync_required = session.check_resync_required(since_cursor)
oldest_cursor = session.get_oldest_cursor()
Comment on lines +1017 to +1019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Normalize since to an integer before replay/resync logic.

since is client-controlled JSON. If it arrives as a string/non-numeric value, the cursor comparisons used during replay/resync can raise and abort the join path instead of returning a protocol error (Line 1002 is part of that failure chain). Parse once at ingress and reject invalid cursors explicitly.

Suggested fix
-                since_cursor = data.get("since")  # Optional: cursor for event replay
+                since_raw = data.get("since")  # Optional: cursor for event replay
+                since_cursor: Optional[int] = None
+                if since_raw is not None:
+                    try:
+                        since_cursor = int(since_raw)
+                    except (TypeError, ValueError):
+                        await self._send_to_client(client_id, {
+                            "type": "error",
+                            "message": "Invalid 'since' cursor. Send an integer from your last received cursor.",
+                        })
+                        return
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/gateway/server.py` around lines 1001 - 1003, The
`since` parameter is client-controlled JSON input that may arrive as a string or
non-numeric value, causing cursor comparison operations in
`check_resync_required` and `get_oldest_cursor` to raise exceptions and abort
the join path instead of returning a protocol error. Parse and normalize `since`
to an integer at the request ingress point before it is used in any cursor
comparison logic, and explicitly return a protocol error if the value is invalid
or cannot be converted to an integer.


# Send join confirmation with integrity check info
await self._send_to_client(client_id, {
"type": "joined",
"session_id": session.session_id,
"agent_id": agent_id,
"resumed": session._was_resumed,
"cursor": session._event_cursor,
"oldest_cursor": oldest_cursor,
"resync_required": resync_required,
})

# Replay missed events if any
for event in replay_events:
if resync_required:
# Send authoritative snapshot instead of partial replay
snapshot = session.get_snapshot()
await self._send_to_client(client_id, {
"type": "replay",
"event": event.to_dict(),
"type": "snapshot",
"state": snapshot,
})
else:
# Replay missed events if any
for event in replay_events:
event_data = event.to_dict()
# Include top-level sequence number from the cursor
seq = event.data.get('cursor', 0)
await self._send_to_client(client_id, {
"type": "replay",
"event": event_data,
"seq": seq,
})
else:
await self._send_to_client(client_id, {
"type": "error",
Expand Down Expand Up @@ -1166,7 +1230,14 @@ async def _send_to_client(self, client_id: str, data: Dict[str, Any]) -> None:
if ws:
try:
# Track event in session BEFORE sending if it's a response or important event
if data.get("type") in ["response", "message", "stream_end", "error"]:
if data.get("type") in [
"response",
"message",
"stream_end",
"error",
"token_stream",
"tool_call_stream",
]:
session_id = self._client_sessions.get(client_id)
if session_id:
session = self._sessions.get(session_id)
Expand All @@ -1180,6 +1251,8 @@ async def _send_to_client(self, client_id: str, data: Dict[str, Any]) -> None:
cursor = session.add_event(event)
# Add cursor to the data BEFORE sending
data["cursor"] = cursor
# Add top-level sequence number for integrity checking
data["seq"] = cursor
Comment on lines 1253 to +1255

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 seq and cursor are always set to the same value (cursor = session.add_event(event)). The existing cursor field already provides the monotonic sequence number for gap detection. Adding seq as a separate top-level alias is redundant — any client that already reads cursor gets the same information. If the intent is to expose the sequence number at the envelope level (outside data), the comment should clarify why cursor in data is insufficient.

Suggested change
data["cursor"] = cursor
# Add top-level sequence number for integrity checking
data["seq"] = cursor
data["cursor"] = cursor
# seq mirrors cursor at the envelope level so clients
# can detect gaps without parsing data['cursor']
data["seq"] = cursor

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


# Send ONCE with cursor already attached if applicable
await ws.send_json(data)
Expand Down Expand Up @@ -1470,7 +1543,9 @@ def resume_or_create_session(

Returns:
Tuple of (session, replay_events) where replay_events are events
that occurred after since_cursor
that occurred after since_cursor. Note: Callers must check
session.check_resync_required(since_cursor) before using replay_events,
as the events may not include the full gap if buffer was trimmed.
"""
replay_events = []

Expand Down
Loading