fix: gateway session data loss and restore deleted _delivery module#2150
fix: gateway session data loss and restore deleted _delivery module#2150cursor[bot] wants to merge 2 commits into
Conversation
- Resume gateway sessions from the latest session_data snapshot instead of the initial empty one written at session creation - Persist active sessions on gateway stop() via close_session() - Restore bots/_delivery.py accidentally removed in #2054; imports for DurableDelivery and DurableAdapterMixin were still live - Add regression tests for resume, shutdown persist, and delivery imports Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
✅ Action performedReview finished.
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds a complete ChangesBot Message Delivery Subsystem
Gateway Session Persistence Fixes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant UnifiedDelivery
participant PlatformAdapter
participant RateLimiter
rect rgba(100, 149, 237, 0.5)
note over Caller,PlatformAdapter: Streaming path (edit-capable platform, long message)
Caller->>UnifiedDelivery: send_message(channel_id, content, stream=True)
UnifiedDelivery->>PlatformAdapter: send_typing(channel_id)
UnifiedDelivery->>PlatformAdapter: send_message(placeholder)
loop progressive edit interval
UnifiedDelivery->>RateLimiter: acquire_token(platform)
UnifiedDelivery->>PlatformAdapter: edit_message(partial_content)
end
UnifiedDelivery->>PlatformAdapter: edit_message(full_content) or chunked fallback
end
rect rgba(144, 238, 144, 0.5)
note over Caller,PlatformAdapter: Chunked path (no edit support or short message)
Caller->>UnifiedDelivery: send_message(channel_id, content, stream=False)
loop each chunk
UnifiedDelivery->>RateLimiter: acquire_token(platform)
UnifiedDelivery->>PlatformAdapter: send_message(chunk)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above from Qodo, Coderabbit, and Gemini first — incorporate their findings. Review areas:
|
|
@claude You are the FINAL architecture reviewer. If the branch is under MervinPraison/PraisonAI (not a fork), you are able to make modifications to this branch and push directly. SCOPE: Focus ONLY on Python packages (praisonaiagents, praisonai). Do NOT modify praisonai-rust or praisonai-ts. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding. Phase 1: Review per AGENTS.md
Phase 2: FIX Valid Issues Phase 3: Final Verdict |
|
Claude finished @MervinPraison's task —— View job ✅ Architecture Review CompleteReview Summary✅ All Issues ResolvedThe PR correctly addresses all three critical regressions:
🔧 Additional Fix AppliedFound and fixed a missing import in the restored
Validation Results✅ Tests: All 4 regression tests pass Files Modified
Final VerdictAPPROVED - All critical bugs are properly fixed with minimal, focused changes. The PR is ready to merge. |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/praisonai/praisonai/bots/_delivery.py`:
- Around line 87-89: The asyncio.create_task() call for the typing indicator in
the _show_typing method is not storing the task reference, allowing Python's
garbage collector to prematurely deallocate it. Store the return value of
asyncio.create_task(self._show_typing(channel_id)) in a reference (either as an
instance variable, a set, or a list) to maintain a strong reference to the task
and prevent premature garbage collection before the typing indicator coroutine
completes.
- Around line 155-191: The infinite loop occurs because when the backoff while
loop reduces next_pos to <= current_pos and breaks at line 169, the outer while
loop continues since current_pos remains unchanged and still satisfies the
condition current_pos < len(text). To fix this, after the inner backoff loop
breaks due to inability to fit content (when next_pos <= current_pos), add a
break statement to exit the outer while loop as well, ensuring the function
terminates when no further progress can be made on chunking the text.
- Around line 269-347: The function deliver_with_retry exists in two
incompatible forms with different signatures: the original in _resilience.py
takes a send_func and policy parameter, while the new version in _delivery.py
takes an adapter, channel_id, and content parameter. Resolve this conflict by
choosing one implementation to keep: either update all callers of the old API in
telegram.py (which currently imports directly from _resilience.py) to use the
new API from _delivery.py and remove the old deliver_with_retry from
_resilience.py, or rename the new implementation in _delivery.py to avoid the
collision (e.g., deliver_with_retry_new) and update the bots/__init__.py export
accordingly. Ensure consistency so that there is only one deliver_with_retry
function accessible from the public API and all internal callers use the same
implementation.
In `@src/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py`:
- Around line 62-66: The test_stop_persists_active_sessions test is calling
await gw.stop() when gw._is_running is still False, which causes the stop()
method to return early without executing the session-closing persistence logic.
Before calling await gw.stop(), ensure the gateway is in a running state by
setting gw._is_running to True (or by starting the gateway through the proper
initialization path). This will allow the stop() method to proceed past its
early return guard and actually execute the session persistence code that the
test is meant to verify.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 35fc29b0-1f34-481d-9803-e3eceab6bc8d
📒 Files selected for processing (3)
src/praisonai/praisonai/bots/_delivery.pysrc/praisonai/praisonai/gateway/server.pysrc/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py
| # Show typing indicator if supported | ||
| if typing and caps.supports_typing: | ||
| asyncio.create_task(self._show_typing(channel_id)) |
There was a problem hiding this comment.
Store reference to background task to prevent premature GC.
The typing indicator task created via asyncio.create_task() is not stored, so it may be garbage-collected before completion. Python's event loop holds only a weak reference to scheduled tasks.
Proposed fix
+ self._background_tasks: set[asyncio.Task] = set()
+
async def send_message(
self,
channel_id: str,
...
# Show typing indicator if supported
if typing and caps.supports_typing:
- asyncio.create_task(self._show_typing(channel_id))
+ task = asyncio.create_task(self._show_typing(channel_id))
+ self._background_tasks.add(task)
+ task.add_done_callback(self._background_tasks.discard)🧰 Tools
🪛 Ruff (0.15.17)
[warning] 89-89: Store a reference to the return value of asyncio.create_task
(RUF006)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/bots/_delivery.py` around lines 87 - 89, The
asyncio.create_task() call for the typing indicator in the _show_typing method
is not storing the task reference, allowing Python's garbage collector to
prematurely deallocate it. Store the return value of
asyncio.create_task(self._show_typing(channel_id)) in a reference (either as an
instance variable, a set, or a list) to maintain a strong reference to the task
and prevent premature garbage collection before the typing indicator coroutine
completes.
Source: Linters/SAST tools
| while current_pos < len(text): | ||
| # Calculate next position based on platform's length unit | ||
| next_pos = current_pos + chunk_size | ||
| if next_pos > len(text): | ||
| next_pos = len(text) | ||
|
|
||
| partial = text[:next_pos] | ||
|
|
||
| # Check if partial exceeds platform limit | ||
| if _calculate_length(partial, caps.length_unit) > caps.max_message_length: | ||
| # Back off to find safe length | ||
| while next_pos > current_pos and _calculate_length(text[:next_pos], caps.length_unit) > caps.max_message_length: | ||
| next_pos -= 10 | ||
| if next_pos <= current_pos: | ||
| break # Can't fit more content | ||
| partial = text[:next_pos] | ||
|
|
||
| # Respect edit interval | ||
| await asyncio.sleep(caps.edit_interval_ms / 1000.0) | ||
|
|
||
| # Apply rate limiting for edits | ||
| if caps.needs_rate_limit: | ||
| await self._acquire_rate_limit() | ||
|
|
||
| # Add ellipsis if not complete | ||
| display_text = partial if next_pos >= len(text) else partial + "..." | ||
|
|
||
| await self.bot.edit_message( | ||
| channel_id, | ||
| message.message_id, | ||
| display_text, | ||
| ) | ||
|
|
||
| if next_pos >= len(text): | ||
| break # Complete text has been sent | ||
|
|
||
| current_pos = next_pos |
There was a problem hiding this comment.
Potential infinite loop when content cannot fit in edit limit.
If _calculate_length(text[:next_pos], caps.length_unit) exceeds caps.max_message_length even at the minimum position, the backoff loop at lines 166-170 reduces next_pos to <= current_pos and breaks. However, the outer while loop continues because:
- The
breakat line 169 only exits the inner backoff loop - Line 191 sets
current_pos = next_pos(same or smaller value) - Outer condition
current_pos < len(text)remains true → infinite loop
Proposed fix
while next_pos > current_pos and _calculate_length(text[:next_pos], caps.length_unit) > caps.max_message_length:
next_pos -= 10
if next_pos <= current_pos:
- break # Can't fit more content
+ # Can't fit more content in streaming mode - fall through to chunking
+ break
partial = text[:next_pos]
# ... edit logic ...
- current_pos = next_pos
+ current_pos = next_pos
+
+ # Safety: if we didn't advance, break to avoid infinite loop
+ if current_pos <= 0 or (next_pos <= current_pos and current_pos > 0):
+ breakAlternatively, restructure to break out of the outer loop when no progress can be made:
if next_pos <= current_pos:
- break # Can't fit more content
+ # Can't fit more content via streaming - proceed to chunked fallback
+ current_pos = len(text) # Force exit, let chunking handle it
+ break
partial = text[:next_pos]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/bots/_delivery.py` around lines 155 - 191, The
infinite loop occurs because when the backoff while loop reduces next_pos to <=
current_pos and breaks at line 169, the outer while loop continues since
current_pos remains unchanged and still satisfies the condition current_pos <
len(text). To fix this, after the inner backoff loop breaks due to inability to
fit content (when next_pos <= current_pos), add a break statement to exit the
outer while loop as well, ensuring the function terminates when no further
progress can be made on chunking the text.
| async def deliver_with_retry( | ||
| adapter: MessageSender, | ||
| channel_id: str, | ||
| content: Union[str, Dict[str, Any]], | ||
| *, | ||
| backoff: Optional[BackoffPolicy] = None, | ||
| max_attempts: int = 3, | ||
| abort_signal: Optional[asyncio.Event] = None, | ||
| platform: str = "", | ||
| **send_kwargs | ||
| ) -> tuple[bool, Optional[str]]: | ||
| """Attempt delivery with bounded exponential backoff retry. | ||
|
|
||
| Args: | ||
| adapter: The adapter to send through | ||
| channel_id: Target channel ID | ||
| content: Message content to send | ||
| backoff: Backoff policy for retries | ||
| max_attempts: Maximum delivery attempts | ||
| abort_signal: Optional event to cancel retries | ||
| platform: Platform name for error classification | ||
| **send_kwargs: Additional kwargs for send_message | ||
|
|
||
| Returns: | ||
| Tuple of (success, error_message) | ||
| - (True, None) on successful delivery | ||
| - (False, error_msg) on permanent failure | ||
| - (False, error_msg) on transient failure after max attempts | ||
| """ | ||
| backoff = backoff or BackoffPolicy() | ||
| last_error = None | ||
|
|
||
| for attempt in range(1, max_attempts + 1): | ||
| try: | ||
| # Attempt delivery | ||
| await adapter.send_message(channel_id, content, **send_kwargs) | ||
|
|
||
| # Success! | ||
| if attempt > 1: | ||
| logger.info( | ||
| f"[{platform}] Delivery succeeded after {attempt} attempts " | ||
| f"to channel {channel_id}" | ||
| ) | ||
|
|
||
| return True, None | ||
|
|
||
| except Exception as e: | ||
| last_error = str(e) | ||
|
|
||
| # Check if error is permanent | ||
| if not is_recoverable_error(e, platform): | ||
| logger.error( | ||
| f"[{platform}] Permanent delivery failure to {channel_id}: {e}" | ||
| ) | ||
| return False, f"Permanent error: {last_error}" | ||
|
|
||
| # Check if we're out of attempts | ||
| if attempt >= max_attempts: | ||
| logger.warning( | ||
| f"[{platform}] Delivery failed after {attempt} attempts " | ||
| f"to {channel_id}: {e}" | ||
| ) | ||
| return False, f"Max attempts exceeded: {last_error}" | ||
|
|
||
| # Calculate backoff delay | ||
| delay = compute_backoff(backoff, attempt) | ||
|
|
||
| logger.warning( | ||
| f"[{platform}] Delivery attempt {attempt} failed to {channel_id}: " | ||
| f"{e}; retrying in {delay:.1f}s" | ||
| ) | ||
|
|
||
| # Sleep with abort capability | ||
| if not await sleep_with_abort(delay, abort_signal): | ||
| logger.info(f"[{platform}] Delivery retry aborted by signal") | ||
| return False, "Aborted by signal" | ||
|
|
||
| # Should never reach here, but for safety | ||
| return False, last_error |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check all usages of deliver_with_retry in the codebase to verify API compatibility
echo "=== Searching for deliver_with_retry calls ==="
rg -n --type py 'deliver_with_retry\(' -A 5 -B 2
echo ""
echo "=== Checking if telegram.py imports and uses deliver_with_retry ==="
rg -n --type py 'from.*_delivery.*import|from.*deliver.*import'
echo ""
echo "=== Looking for 'policy=' parameter usage (old API) ==="
rg -n --type py 'deliver_with_retry.*policy='Repository: MervinPraison/PraisonAI
Length of output: 12486
🏁 Script executed:
#!/bin/bash
# Check what deliver_with_retry is imported in telegram.py
echo "=== telegram.py imports ==="
head -100 src/praisonai/praisonai/bots/telegram.py | grep -E "^from|^import"
echo ""
echo "=== Check if _resilience.py exists and has deliver_with_retry ==="
grep -n "async def deliver_with_retry" src/praisonai/praisonai/bots/_resilience.py
echo ""
echo "=== Check bots/__init__.py exports ==="
grep -n "deliver_with_retry" src/praisonai/praisonai/bots/__init__.py
echo ""
echo "=== Check if _delivery.py imports from _resilience ==="
head -30 src/praisonai/praisonai/bots/_delivery.py | grep -E "^from|^import"Repository: MervinPraison/PraisonAI
Length of output: 1948
🏁 Script executed:
#!/bin/bash
# Check test files for both implementations
echo "=== Tests for _resilience deliver_with_retry ==="
find . -name "*.py" -type f -exec grep -l "test.*deliver_with_retry\|deliver_with_retry.*test" {} \;
echo ""
echo "=== Test imports and usage ==="
grep -B 5 "deliver_with_retry" src/praisonai/tests/unit/bots/test_outbound_retry.py | head -30
echo ""
echo "=== Full _resilience.py deliver_with_retry signature ==="
sed -n '255,280p' src/praisonai/praisonai/bots/_resilience.py
echo ""
echo "=== Check if _delivery module is being imported/used elsewhere (besides __init__) ==="
rg -n "from \._delivery import|from praisonai\.bots\._delivery import" --type py | grep -v "__init__"Repository: MervinPraison/PraisonAI
Length of output: 2180
Address the API naming conflict and incompatibility with deliver_with_retry.
Two incompatible implementations of deliver_with_retry currently coexist in the codebase:
_resilience.pydefines the original API:deliver_with_retry(send_func, *, policy, platform, parked_store, reply_data, ...)_delivery.pydefines a new API:deliver_with_retry(adapter, channel_id, content, *, backoff, max_attempts, platform, ...)
This creates a critical issue:
-
Active usage conflict:
telegram.pyexplicitly imports from_resilienceand uses the old API (lines 774, 813, 833, 916, 929), which expects a callable andpolicyparameter. -
Module export conflict:
bots/__init__.pyexportsdeliver_with_retryfrom_delivery.py(line 30), but the old implementation still exists and is directly imported elsewhere. Consumers importing frompraisonai.botswill get the new incompatible API. -
Dual implementation: Both implementations are tested and actively used—the old one by
telegram.pyand tests, the new one internally by_delivery.pyand bypraisonai-agents/gateway/protocols.py.
Either consolidate to a single implementation with backward compatibility, or rename one function to avoid the collision (e.g., deliver_with_retry_new or refactor the old one into deliver_with_retry_func). If the new API is intended to replace the old one, update all callers including telegram.py and remove the old implementation from _resilience.py.
🧰 Tools
🪛 Ruff (0.15.17)
[warning] 315-315: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/praisonai/bots/_delivery.py` around lines 269 - 347, The
function deliver_with_retry exists in two incompatible forms with different
signatures: the original in _resilience.py takes a send_func and policy
parameter, while the new version in _delivery.py takes an adapter, channel_id,
and content parameter. Resolve this conflict by choosing one implementation to
keep: either update all callers of the old API in telegram.py (which currently
imports directly from _resilience.py) to use the new API from _delivery.py and
remove the old deliver_with_retry from _resilience.py, or rename the new
implementation in _delivery.py to avoid the collision (e.g.,
deliver_with_retry_new) and update the bots/__init__.py export accordingly.
Ensure consistency so that there is only one deliver_with_retry function
accessible from the public API and all internal callers use the same
implementation.
| await gw.stop() | ||
|
|
||
| store = gw._session_store | ||
| assert store is not None | ||
| assert store.session_exists(session_id) |
There was a problem hiding this comment.
test_stop_persists_active_sessions currently skips the shutdown persistence path.
At Line 62, await gw.stop() runs while gw._is_running is still False, so stop() returns early (Line 924 in src/praisonai/praisonai/gateway/server.py) and never executes session-closing persistence. This test can pass even if shutdown persistence regresses.
Suggested test fix
from pathlib import Path
+from types import SimpleNamespace
@@
async def test_stop_persists_active_sessions(self, tmp_path):
gw = _make_gateway(tmp_path)
+ gw._is_running = True
+ gw._pid_lock = SimpleNamespace(release_lock=lambda: None)
session_id = "sess-stop-1"
@@
await gw.stop()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/praisonai/tests/unit/gateway/test_gateway_persistence_fixes.py` around
lines 62 - 66, The test_stop_persists_active_sessions test is calling await
gw.stop() when gw._is_running is still False, which causes the stop() method to
return early without executing the session-closing persistence logic. Before
calling await gw.stop(), ensure the gateway is in a running state by setting
gw._is_running to True (or by starting the gateway through the proper
initialization path). This will allow the stop() method to proceed past its
early return guard and actually execute the session persistence code that the
test is meant to verify.
The _delivery.py module was exporting deliver_with_retry in __all__ but not importing it from _resilience.py where it's actually defined. This would cause AttributeError when importing.
Summary
Critical bug scan found three regressions introduced by recent commits (notably #2054):
1. Missing
bots/_delivery.py(crash on import)Impact:
ModuleNotFoundErrorwhen importingDurableDeliveryorDurableAdapterMixin— breaks durable outbound delivery for all bot adapters using the mixin.Root cause: #2054 removed
_delivery.pyas "unused", but_durable_adapter.pyandbots/__init__.pystill import from it.Fix: Restore the module.
2. Gateway session resume data loss
Impact: Resuming a persisted gateway session restores the initial empty snapshot instead of the latest state — all conversation history is lost on reconnect.
Root cause:
create_session()breaks on the firstsession_datasystem message (written at init), butclose_session()appends updated snapshots. Thebreakprevents reaching the latest one.Fix: Iterate all system messages and keep the last
session_datasnapshot.3. Gateway shutdown data loss
Impact: Graceful gateway shutdown discards all active session state without persisting it.
Root cause:
stop()callssession.close()directly instead ofclose_session(), which is the method that writes final state to the session store.Fix: Call
close_session(session_id)for each active session on shutdown.Validation
Added regression tests in
tests/unit/gateway/test_gateway_persistence_fixes.py:test_resume_uses_latest_session_data_snapshot— PASSEDtest_stop_persists_active_sessions— PASSEDtest_durable_delivery_import— PASSEDtest_durable_adapter_mixin_import— PASSEDSummary by CodeRabbit
New Features
Bug Fixes