fix: update event replay test to include completed event#23
Merged
Conversation
smxzk32145745
commented
Jun 4, 2026
Contributor
- Modified the test for Redis event bus replay to assert that the completed event is included in the replayed events.
- Ensured that the initial event ID is not present in the replayed events list, improving test accuracy and reliability.
- Implemented Last-Event-ID header and last_event_id query parameter for clients to receive missed events upon reconnection. - Updated EventBus to support durable event replay using Redis Streams. - Enhanced SSE event handling in both backend and frontend to include event IDs. - Added tests for event replay functionality to ensure reliability and correctness.
- Modified the test for Redis event bus replay to assert that the completed event is included in the replayed events. - Ensured that the initial event ID is not present in the replayed events list, improving test accuracy and reliability.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR expands run event streaming to support durable SSE replay using event IDs (via Last-Event-ID header or last_event_id query param), adds tests validating replay behavior (including terminal events), and updates both frontend and backend components to propagate and use event IDs across reconnects.
Changes:
- Backend: add durable per-run event IDs + replay (
InMemoryEventBuslog;RedisEventBuspersists to Redis Stream) and update SSE endpoint to replay missed events. - Frontend: track last seen SSE event ID and include it on reconnect (
last_event_idquery param); storeeventIdon received events. - Java API: add Redis Stream replay support in SSE service/controller and corresponding configuration properties.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| frontend/lib/useRunEventSource.ts | Stores eventId alongside events received from the SSE connection. |
| frontend/lib/run-event-connection.ts | Tracks SSE lastEventId and uses ?last_event_id= on reconnect; passes eventId through callbacks. |
| docs/api-contract.md | Documents replay behavior using Last-Event-ID header or last_event_id query param and Redis Stream storage. |
| backend/tests/test_event_replay.py | Adds end-to-end SSE replay tests and a Redis replay unit test (fakeredis). |
| backend/app/events/bus.py | Introduces event IDs, replay API, in-memory event log, and Redis Stream persistence + envelope publishing. |
| backend/app/core/config.py | Adds Redis event stream configuration settings (prefix/suffix/max-len). |
| backend/app/api/v1/events.py | Implements replay-on-connect and catch-up replay logic for SSE endpoint; emits SSE id. |
| backend-java/src/main/resources/application.yml | Adds stream suffix/max-len config under agentflow.events. |
| backend-java/src/main/java/io/agentflow/api/service/EventStreamService.java | Adds Redis Stream replay and Last-Event-ID-aware dedupe for SSE subscriptions. |
| backend-java/src/main/java/io/agentflow/api/controller/EventsController.java | Accepts Last-Event-ID header or last_event_id query param and forwards to service. |
| backend-java/src/main/java/io/agentflow/api/config/AgentflowProperties.java | Adds streamSuffix / streamMaxLen properties for events config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if event_id: | ||
| last_id = event_id | ||
|
|
||
| yield _sse_frame(event_id or "0", event) |
Comment on lines
+31
to
+37
| def _is_after(entry_id: str, after_id: str | None) -> bool: | ||
| if after_id is None: | ||
| return True | ||
| try: | ||
| return int(entry_id) > int(after_id) | ||
| except ValueError: | ||
| return entry_id > after_id |
Comment on lines
+8
to
+10
| import fakeredis.aioredis as fakeredis | ||
| import pytest | ||
| from httpx import ASGITransport, AsyncClient |
Comment on lines
+1
to
11
| """Per-run pub/sub event bus with a durable replay log. | ||
|
|
||
| The bus has two implementations selected at runtime: | ||
|
|
||
| * `InMemoryEventBus` – per-process asyncio queues. Suitable for single-node | ||
| development and for tests. | ||
| * `RedisEventBus` – Redis pub/sub channels keyed by `agentflow:run:{id}`. | ||
| Enabled automatically when `AGENTFLOW_REDIS_URL` is configured. | ||
| * `InMemoryEventBus` – per-process asyncio queues plus an in-process event | ||
| log. Suitable for single-node development and for tests. | ||
| * `RedisEventBus` – Redis pub/sub for live delivery and a Redis Stream per | ||
| run for `Last-Event-ID` replay. Enabled when `AGENTFLOW_REDIS_URL` is set. | ||
|
|
||
| Both implementations expose the same async interface and are interchangeable. | ||
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.