fix(plugin-native): join worker thread on all run_source exit paths#519
fix(plugin-native): join worker thread on all run_source exit paths#519staging-devin-ai-integration[bot] wants to merge 1 commit into
Conversation
Store the worker thread's JoinHandle in InstanceWorker and add an async shutdown() method that drops the channel sender then joins the thread via spawn_blocking. All pre-Start and post-Start exit paths in run_source now call shutdown() so the worker completes before the function returns, preventing use-after-dlclose races during process teardown. The test-side drain_detached_worker() sleep workaround is removed since the production code now guarantees the worker has exited. Closes #481 Signed-off-by: streamkit-devin <devin@streamkit.dev>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
| impl InstanceWorker { | ||
| /// Drop the channel sender (signalling the worker to exit) and join | ||
| /// its thread via `spawn_blocking` so we don't block the async runtime. | ||
| async fn shutdown(mut self) { | ||
| let handle = self.join_handle.take(); | ||
| let node_id = self.node_id.clone(); | ||
| drop(self); | ||
| if let Some(h) = handle { | ||
| let _ = tokio::task::spawn_blocking(move || { | ||
| if let Err(panic) = h.join() { | ||
| tracing::warn!(node = %node_id, "Worker thread panicked: {panic:?}"); | ||
| } | ||
| }) | ||
| .await; | ||
| } | ||
| } |
There was a problem hiding this comment.
📝 Info: Processor workers are still detached on non-source exit paths
The PR adds a join handle to every InstanceWorker, but worker.shutdown().await is only wired into the source-plugin lifecycle. Processor plugins still rely on InstanceWorker::Drop detaching the thread when run_processor returns or errors, because run_processor never calls shutdown before returning at crates/plugin-native/src/wrapper.rs:1635-1650. This appears intentional given the existing timeout contract and the comment in Drop, but reviewers should be aware that the teardown-race fix is scoped to source plugins/tests rather than all native plugin workers.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Correct — this is intentional. The Drop fallback (detach) remains for run_processor and is documented in both the Drop comment and the PR summary. Extending shutdown() to run_processor would require the same restructuring there and is a separate concern; the issue (#481) is specifically about the source plugin teardown race exposed by coverage instrumentation.
There was a problem hiding this comment.
Filed tracking issues for the two observations above:
- fix(plugin-native): join worker thread on run_processor exit paths #522 — join worker thread on
run_processorexit paths - fix(plugin-native): join worker thread on run_source timeout/error early returns #523 — bounded join on
run_sourcetimeout/error early returns
| let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); | ||
| self.send_to_worker( | ||
| WorkerCallContext { | ||
| op: "tick", | ||
| node: &node_name, | ||
| state_tx: Some(&context.state_tx), | ||
| telemetry: Some(&telemetry), | ||
| metric_labels: &self.state.labels_tick, | ||
| }, | ||
| worker_tx, | ||
| WorkerRequest::Tick { reply: reply_tx }, | ||
| ) | ||
| .await?; | ||
|
|
||
| // Send outputs produced by tick. If the output channel is closed, | ||
| // stop ticking — source nodes have no input-close backstop so we must | ||
| // detect consumer disconnect here. | ||
| let mut output_closed = false; | ||
| for (pin, pkt) in reply.outputs { | ||
| if context.output_sender.send(&pin, pkt).await.is_err() { | ||
| tracing::debug!(node = %node_name, "Output channel closed during tick"); | ||
| output_closed = true; | ||
| let reply = self | ||
| .await_reply( | ||
| "tick", | ||
| &node_name, | ||
| reply_rx, | ||
| Some(&context.state_tx), | ||
| Some(&telemetry), | ||
| &self.state.labels_tick, | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
📝 Info: Early timeout errors bypass source shutdown join
Inside the source tick loop, send_to_worker(...).await?, await_reply(...).await?, and apply_params_update(...).await? still return early from run_source before the final worker.shutdown().await at crates/plugin-native/src/wrapper.rs:2067. That means timed-out or wedged plugin calls keep the old detach behavior instead of joining, which avoids blocking forever on a stuck FFI call but also means the new deterministic join only applies to clean source exits and plugin-reported tick errors.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Right — this is deliberate. The ? exits are for dead-worker or timeout scenarios where the FFI call may be stuck indefinitely. Joining there would risk blocking spawn_blocking for the full timeout duration (or forever if the plugin wedges). The Arc<InstanceState> keeps the plugin instance alive until the worker finishes, so detaching is safe in those error paths.
There was a problem hiding this comment.
Tracked in #523 — a bounded join with a short secondary timeout could be explored there.
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (67.34%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #519 +/- ##
==========================================
+ Coverage 79.39% 79.41% +0.01%
==========================================
Files 232 232
Lines 66904 66935 +31
Branches 1909 1909
==========================================
+ Hits 53117 53155 +38
+ Misses 13781 13774 -7
Partials 6 6
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Summary
run_sourcereturned from pre-Start exit branches (Shutdown, control-channel close, cancellation) and post-Start exits (shutdown during ticks, tick error, normal completion) without joining the per-instance worker thread. The detached worker could still be inside the.sowhen dlclose ran during process teardown, causing SIGSEGV under coverage instrumentation.JoinHandleinInstanceWorkerand add an asyncshutdown()method that drops the channel sender then joins the thread viaspawn_blocking.run_sourcenow callshutdown()before returning. The post-Start tick loop is restructured to converge at a single exit point rather than using early returns, so the join always happens.Dropimpl remains as a fallback forrun_processorand?-error exits — it detaches the thread (safe because theArc<InstanceState>keeps the plugin alive).drain_detached_worker()200ms sleep workaround since the production code now guarantees the worker has exited.Review & Validation
shutdown()method correctly sequences: drop sender →spawn_blockingjoin — no deadlock risk since the worker only blocks on the channel.{ let worker_tx = &worker.tx; loop { … } }releases the borrow beforeworker.shutdown().await.CARGO_LLVM_COV_TARGET_DIR=target/coverage cargo llvm-cov --no-report nextest -p streamkit-plugin-native --test source_pluginCloses #481
Link to Devin session: https://staging.itsdev.in/sessions/6f336a5db7d444018cff5cf532c6f6c7
Requested by: @streamer45
Devin Review
b988723