fix: persist final step when saveStreamDeltas.returnImmediately is true (#265)#266
Conversation
When streamText is called with saveStreamDeltas.returnImmediately: true, the function returns before awaiting stream consumption. The previous deferred-save path (set pendingFinalStep in onStepFinish, save it after the await) never fired in that case — onStepFinish runs only when the caller later drains the stream, by which point the post-await block has already been skipped. Result: the final assistant message was never persisted and the stream stayed stuck in "streaming" status. Branch on a new willAwaitStream flag in onStepFinish: - If we'll await consumption, defer the save as before (issue #181). - If not (returnImmediately), save the final step inline using the streamer's streamId so the message and stream-finish land atomically. Also guard DeltaStreamer.addParts against late deltas after markFinishedExternally — the stream record is already finished and streams.addDelta would drop them anyway. Fixes #265.
commit: |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository: get-convex/coderabbit/.coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR fixes issue Sequence Diagram(s)sequenceDiagram
participant Agent
participant Streamer as DeltaStreamer
participant StreamsAPI as streams.addDelta/finish
participant Messages as MessagesTable
Agent->>Streamer: streamText start (saveStreamDeltas:{returnImmediately})
Streamer->>Streamer: compute willAwaitStream
alt willAwaitStream == false (returnImmediately)
Streamer->>Messages: save final step inline (getOrCreateStreamId + save)
Streamer->>Agent: return immediately (stream consumed in background)
Agent->>StreamsAPI: consumeStream -> finish
StreamsAPI->>Messages: finalization (if needed)
else willAwaitStream == true
Streamer->>Agent: return a streaming result
Agent->>Streamer: consumer drains stream
Streamer->>StreamsAPI: addDelta/... finish
StreamsAPI->>Messages: persist deferred final step and mark finished
end
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/client/streaming.ts`:
- Around line 290-296: The in-flight delta path can reach `#sendDelta` after
this.#finishedExternally is set and currently treats addDelta returning false as
an async failure that aborts the stream; instead, update the logic in the
delta-send flow (the caller of addDelta and the `#sendDelta` handler) to treat a
false return from addDelta as a benign "delta dropped because stream already
finished" condition: when addDelta(...) === false, simply return (optionally log
at debug) and do not call abort/fail the stream or set error state; keep the
existing behavior of aborting only for real errors/exceptions thrown by addDelta
or network failures. Ensure references to `#sendDelta`, addDelta, and
`#finishedExternally` are used so you change the correct branch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: get-convex/coderabbit/.coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a0996dde-57a0-4641-9956-0487da9974a0
📒 Files selected for processing (3)
src/client/streamText.test.tssrc/client/streamText.tssrc/client/streaming.ts
Two issues found on the open PR: 1. CodeRabbit (src/client/streaming.ts): an in-flight #sendDelta started before markFinishedExternally() can call addDelta after the stream row is already "finished" and get `success === false`. The current path treats that as an async-abort and calls onAsyncAbort + abort(), incorrectly turning a successfully externally-finished stream into the failure path. Skip the abort branch when #finishedExternally is set — the late-write miss is benign. 2. CI typecheck (example/convex/setup.test.ts): @convex-dev/workflow/test and @convex-dev/rate-limiter/test still type their `register` against the pre-generic SchemaDefinition<GenericSchema, boolean>, so this file has been failing tsc -p example/convex on main for the last three CI runs. Cast through the broader type so the file typechecks regardless of which version of those packages resolves. The setup.test.ts fix is lifted from PR #259 (same provenance as the rest of this PR).
|
Pushed CodeRabbit's CI failure (unrelated to this PR, but fixed here) — the Local verification: |
Summary
Fixes #265: when
streamTextis called withsaveStreamDeltas: { returnImmediately: true }, the final assistant message was never persisted and the stream stayed stuck in"streaming"status forever. Only intermediate (tool-call) steps were saved.Root cause
The deferred-save path added for atomic stream finish (#181) doesn't work when
streamTextreturns without awaiting stream consumption:onStepFinishruns only when the caller later drains the stream (e.g. viaresult.toTextStreamResponse()).returnImmediately: true,streamTextskips its ownawait streamblock and exits.if (pendingFinalStep && streamer)block runs synchronously after exit — butonStepFinishhasn't fired yet, sopendingFinalStepis stillundefined.onStepFinishfires, setspendingFinalStepand callsmarkFinishedExternally()— but no one is readingpendingFinalStepanymore, andconsumeStreamskipsfinish()due to the external-finish flag.Fix
Branch on a new
willAwaitStreamflag inonStepFinish:streamTextwill await consumption (saveStreamDeltas === trueorreturnImmediately: false): defer the save viapendingFinalStepas before (preserves the atomic-finish behavior from Sub-messages flow problem #181).returnImmediately: true): save the final step inline usingstreamer.getOrCreateStreamId()so the message persistence and stream-finish land atomically — beforestreamTextreturns.Also guards
DeltaStreamer.addPartsagainst late deltas aftermarkFinishedExternally. The stream record is already in"finished"state andstreams.addDeltawould silently drop those writes anyway; skipping them is just an optimization, but it avoids racing the inline save.Provenance
These two pieces (the
willAwaitStreambranch and theaddPartsguard) are lifted from the open PR #259 ("Add httpStreamText and useHttpStream"), which bundles them with a larger HTTP streaming surface (~2,800 LOC across 21 files). This PR cherry-picks just the bug fix so #265 can ship independently of that feature work — which is also currently blocked on merge conflicts withmain.Notes for reviewers
addPartsguard, delta chunks emitted by the model afteronStepFinishfires but beforeconsumeStreamhas finished pumping them are silently dropped on thereturnImmediatelypath. The persisted message is unaffected — it carries the complete final-step text viacall.save({step}). A UI subscribed to live deltas could see slightly-truncated streaming content until it reconciles against the saved message, but no canonical data is lost.Test plan
src/client/streamText.test.tsreproduces the issue: callsstreamTextwithreturnImmediately: true, drains the result, then asserts (a) a persisted assistant message exists with the model's text and (b) no stream is left in"streaming"status.mainwithexpected 0 to be greater than 0(no assistant message persisted).npm run lintclean.npx tsc --noEmitclean.example/convex/*.test.tsfailures are pre-existing onmain— they neednpm run buildto populatedist/first; unrelated to this change.)