Skip to content

feat: enhance event streaming with Last-Event-ID support#22

Merged
Emiyaaaaa merged 1 commit into
Emiyaaaaa:mainfrom
smxzk32145745:feat/sse
Jun 4, 2026
Merged

feat: enhance event streaming with Last-Event-ID support#22
Emiyaaaaa merged 1 commit into
Emiyaaaaa:mainfrom
smxzk32145745:feat/sse

Conversation

@smxzk32145745

Copy link
Copy Markdown
Contributor
  • Implemented Last-Event-ID header and last_event_id query parameter for clients to receive missed events upon reconnection.
  • Updated EventBus to support durable event replay using Redis Streams.
  • Enhanced SSE event handling in both backend and frontend to include event IDs.
  • Added tests for event replay functionality to ensure reliability and correctness.

- Implemented Last-Event-ID header and last_event_id query parameter for clients to receive missed events upon reconnection.
- Updated EventBus to support durable event replay using Redis Streams.
- Enhanced SSE event handling in both backend and frontend to include event IDs.
- Added tests for event replay functionality to ensure reliability and correctness.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the run SSE event stream to support durable replay on reconnect via the Last-Event-ID mechanism (and a last_event_id query param), spanning frontend consumption, Python backend event streaming, and Java API SSE bridging.

Changes:

  • Add event IDs to SSE frames and propagate them through the frontend connection + hooks.
  • Introduce replay-capable event buses (in-memory log + Redis Streams) and use Last-Event-ID/last_event_id to replay missed events before resuming live delivery.
  • Add backend tests covering replay behavior and event ID presence.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
frontend/lib/useRunEventSource.ts Stores received SSE eventId alongside events in React state.
frontend/lib/run-event-connection.ts Tracks lastEventId from SSE frames and reconnects using last_event_id query param; passes event IDs to handlers.
backend/tests/test_event_replay.py Adds tests for replay via header/query param and verifies event IDs are emitted; includes Redis replay test with fakeredis.
backend/app/events/bus.py Extends EventBus to return event IDs, adds replay API, and implements Redis Streams-backed durable replay.
backend/app/core/config.py Adds Redis stream configuration settings (prefix/suffix/max len).
backend/app/api/v1/events.py Implements SSE replay + catch-up logic using Last-Event-ID / last_event_id and emits SSE id fields.
backend-java/src/main/resources/application.yml Adds stream replay configuration for the Java service.
backend-java/src/main/java/io/agentflow/api/service/EventStreamService.java Adds Redis Stream replay + Last-Event-ID support and envelope parsing for pub/sub messages.
backend-java/src/main/java/io/agentflow/api/controller/EventsController.java Accepts Last-Event-ID header and last_event_id query param and forwards to service.
backend-java/src/main/java/io/agentflow/api/config/AgentflowProperties.java Adds stream replay properties (suffix/max len) under agentflow.events.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread backend/app/events/bus.py
Comment on lines +31 to +37
def _is_after(entry_id: str, after_id: str | None) -> bool:
if after_id is None:
return True
try:
return int(entry_id) > int(after_id)
except ValueError:
return entry_id > after_id
Comment on lines +117 to +121
try {
replay(emitter, streamKey, lastSentId.get(), lastSentId);
} catch (Exception e) {
log.warn("Failed catch-up replay for run {}", runId, e);
}
Comment on lines +212 to +217
private static boolean isAfter(String candidate, String lastId) {
try {
return Long.parseLong(candidate) > Long.parseLong(lastId);
} catch (NumberFormatException ex) {
return candidate.compareTo(lastId) > 0;
}
Comment on lines +87 to +90
if event_id:
last_id = event_id

yield _sse_frame(event_id or "0", event)
@Emiyaaaaa Emiyaaaaa merged commit fb0a855 into Emiyaaaaa:main Jun 4, 2026
2 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants