Skip to content

feat(api): implement StreamEvents gRPC with Redis Streams fan-out#28

Open
Depo-dev wants to merge 1 commit into
devfrom
feat/issue-12-stream-events-grpc
Open

feat(api): implement StreamEvents gRPC with Redis Streams fan-out#28
Depo-dev wants to merge 1 commit into
devfrom
feat/issue-12-stream-events-grpc

Conversation

@Depo-dev

@Depo-dev Depo-dev commented Jun 3, 2026

Copy link
Copy Markdown
Collaborator

Summary

Implements stream_events server-streaming RPC. Each connection spawns a dedicated tokio reader task that consumes the trident:events Redis stream and pushes matching events down a bounded channel to the gRPC client.

Implementation

  • XREAD BLOCK 5000 COUNT 100 loop with $ start cursor (new messages only)
  • Filters by contract_id (required) and topic_0 (optional)
  • Task terminates when client disconnects (channel closed check after each XREAD timeout)
  • Redis errors logged + 1s backoff before retry
  • Uses redis::aio::ConnectionManager for safe clone-and-share across tasks
  • Wires REDIS_URL env var in main.rs

Test

stream_events_delivers_published_event — publishes a synthetic Redis entry 100ms after subscription, asserts event arrives within 8s. Gated on TEST_REDIS_URL.

Closes #12

…-out

On each client connection, spawns a tokio task that runs XREAD BLOCK
(5s timeout) in a loop on the trident:events stream. Filters entries by
contract_id and optional topic_0, builds proto Event messages from the
flat Redis hash, and sends them down a bounded mpsc channel returned as
ReceiverStream. Task exits cleanly when the client disconnects (tx
closed). Adds ConnectionManager for safe multi-clone Redis access.
Integration test publishes a synthetic Redis entry and asserts it
arrives on the connected stream (gated on TEST_REDIS_URL).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant