Skip to content

Fix dropped first chunk in incremental stream readers#339

Open
MaxHeimbrock wants to merge 1 commit into
mainfrom
max/fix-read-incremental-first-chunk-race
Open

Fix dropped first chunk in incremental stream readers#339
MaxHeimbrock wants to merge 1 commit into
mainfrom
max/fix-read-incremental-first-chunk-race

Conversation

@MaxHeimbrock

Copy link
Copy Markdown
Contributor

Problem

Agent transcriptions were intermittently missing the first word(s) of a reply.

TextStreamReader.ReadIncremental() (and the byte-stream equivalent) sent the FFI request before constructing the ReadIncrementalInstruction whose constructor subscribes to TextStreamReaderEventReceived. Three facts turn that ordering into a real chunk drop:

  1. The Rust FFI server spawns a task that drains already-buffered chunks the moment it receives the read_incremental request (livekit-ffi/src/server/data_stream.rs). By the time the app calls ReadIncremental(), the first chunk of an agent reply has typically already arrived and is buffered — the stream-open event took a main-thread hop to reach the handler.
  2. FFIClient.RouteFfiEvent invokes TextStreamReaderEventReceived directly on the FFI callback thread (no main-thread queueing), so the first chunk event races the remainder of ReadIncremental() on the main thread.
  3. Events fired before the subscription exists are silently lost — the pending-chunk queue in ReadIncrementalInstructionBase only buffers chunks that arrive after subscription.

When the Rust task won the race, the first chunk was gone. For delta text streams (agent transcriptions via lk.transcription) that means the first word(s) of the reply never render. ReadAll() already used the safe order, which is why snapshot-style streams were unaffected.

Fix

Construct the instruction (subscribing to reader events) before sending the request, in both TextStreamReader.ReadIncremental() and ByteStreamReader.ReadIncremental(). C# event add/invoke is an atomic delegate swap, so subscribing first is sufficient; chunks arriving before the first yield are already handled by the pending-chunk queue.

🤖 Generated with Claude Code

ReadIncremental() sent the FFI request before constructing the
ReadIncrementalInstruction that subscribes to reader events. Chunk
events are delivered directly on the FFI callback thread, and the FFI
server emits already-buffered chunks as soon as it receives the
request, so any chunk emitted before the subscription existed was
silently lost. For delta text streams (e.g. agent transcriptions) this
intermittently dropped the first word(s) of a reply.

Subscribe first, then send — matching the order ReadAll() already uses.
Applies to both TextStreamReader and ByteStreamReader.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant