From b9887231441f3f6c5909a772e8c98b485244b51a Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Mon, 25 May 2026 14:45:33 +0000 Subject: [PATCH] fix(plugin-native): join worker thread on all run_source exit paths Store the worker thread's JoinHandle in InstanceWorker and add an async shutdown() method that drops the channel sender then joins the thread via spawn_blocking. All pre-Start and post-Start exit paths in run_source now call shutdown() so the worker completes before the function returns, preventing use-after-dlclose races during process teardown. The test-side drain_detached_worker() sleep workaround is removed since the production code now guarantees the worker has exited. Closes #481 Signed-off-by: streamkit-devin --- crates/plugin-native/src/wrapper.rs | 361 +++++++++++--------- crates/plugin-native/tests/source_plugin.rs | 21 -- 2 files changed, 199 insertions(+), 183 deletions(-) diff --git a/crates/plugin-native/src/wrapper.rs b/crates/plugin-native/src/wrapper.rs index 5dbbc5ef..3e3e9874 100644 --- a/crates/plugin-native/src/wrapper.rs +++ b/crates/plugin-native/src/wrapper.rs @@ -419,20 +419,39 @@ struct WorkerReply { struct InstanceWorker { tx: tokio::sync::mpsc::Sender, + join_handle: Option>, node_id: String, } +impl InstanceWorker { + /// Drop the channel sender (signalling the worker to exit) and join + /// its thread via `spawn_blocking` so we don't block the async runtime. + async fn shutdown(mut self) { + let handle = self.join_handle.take(); + let node_id = self.node_id.clone(); + drop(self); + if let Some(h) = handle { + let _ = tokio::task::spawn_blocking(move || { + if let Err(panic) = h.join() { + tracing::warn!(node = %node_id, "Worker thread panicked: {panic:?}"); + } + }) + .await; + } + } +} + impl Drop for InstanceWorker { fn drop(&mut self) { // Dropping `tx` closes the channel so the worker's `blocking_recv` - // returns `None`. The worker thread is detached — do NOT join - // synchronously. This Drop runs on a tokio worker thread (inside - // async run_processor/run_source), so a blocking join would stall the - // async runtime if the worker is stuck in a long FFI call (e.g. after - // a timeout). The worker holds an Arc which ensures the - // plugin instance stays alive until the FFI call completes; - // request_drop/destroy_instance handle deferred cleanup. - tracing::debug!(node = %self.node_id, "Detaching plugin worker thread"); + // returns `None`. If shutdown() was called the join handle has + // already been taken and the thread will be joined there. + // Otherwise the thread is detached — safe because the worker + // holds an Arc keeping the plugin instance alive + // until the FFI call completes. + if self.join_handle.is_some() { + tracing::debug!(node = %self.node_id, "Detaching plugin worker thread"); + } } } @@ -1254,7 +1273,7 @@ impl NativeNodeWrapper { // truncates longer names. We just format and let the OS handle it. let thread_name = format!("skp-{node_id}"); let worker_node_id = node_id.clone(); - std::thread::Builder::new() + let handle = std::thread::Builder::new() .name(thread_name) .spawn(move || { worker_thread_main( @@ -1271,7 +1290,7 @@ impl NativeNodeWrapper { .map_err(|e| { StreamKitError::Runtime(format!("Failed to spawn plugin worker thread: {e}")) })?; - Ok(InstanceWorker { tx, node_id: worker_node_id }) + Ok(InstanceWorker { tx, join_handle: Some(handle), node_id: worker_node_id }) } /// Await a oneshot reply from the worker, applying the configured timeout. @@ -1723,6 +1742,7 @@ impl NativeNodeWrapper { { warn!(error = %e, node = %node_name, "Failed to send stopped state"); } + worker.shutdown().await; return Ok(()); } @@ -1744,6 +1764,7 @@ impl NativeNodeWrapper { { warn!(error = %e, node = %node_name, "Failed to send stopped state"); } + worker.shutdown().await; return Ok(()); } Some(NodeControlMessage::UpdateParams(params_value)) => { @@ -1769,6 +1790,7 @@ impl NativeNodeWrapper { { warn!(error = %e, node = %node_name, "Failed to send stopped state"); } + worker.shutdown().await; return Ok(()); } } @@ -1838,67 +1860,77 @@ impl NativeNodeWrapper { tokio::sync::mpsc::Receiver, )> = Vec::new(); - let worker_tx = &worker.tx; + let mut tick_result: Result<(), StreamKitError> = Ok(()); + let mut final_state_emitted = false; - loop { - // Check tick limit - if max_ticks > 0 && tick_count >= max_ticks { - tracing::info!(node = %node_name, ticks = tick_count, "Source reached max ticks"); - break; - } + { + let worker_tx = &worker.tx; - // Non-blocking drain of pending control messages. - while let Ok(ctrl) = context.control_rx.try_recv() { - match ctrl { - NodeControlMessage::Shutdown => { - tracing::info!(node = %node_name, "Source plugin received shutdown"); - if let Err(e) = context - .state_tx - .send(NodeStateUpdate::new( - node_name.clone(), - NodeState::Stopped { reason: StopReason::Completed }, - )) - .await - { - warn!(error = %e, node = %node_name, "Failed to send stopped state"); - } - return Ok(()); - }, - NodeControlMessage::UpdateParams(params_value) => { - self.apply_params_update( - &node_name, - ¶ms_value, - worker_tx, - &context.state_tx, - Some(&telemetry), - ) - .await?; - }, - NodeControlMessage::Start => { - // Already started — ignore duplicate. - }, + loop { + // Check tick limit + if max_ticks > 0 && tick_count >= max_ticks { + tracing::info!(node = %node_name, ticks = tick_count, "Source reached max ticks"); + break; } - } - // Non-blocking drain of pin management messages to pick up - // OutputHintChannel deliveries from the engine. - if let Some(ref mut pin_mgmt_rx) = context.pin_management_rx { - while let Ok(msg) = pin_mgmt_rx.try_recv() { - if let streamkit_core::pins::PinManagementMessage::OutputHintChannel { - pin_name: ref pn, - hint_rx, - } = msg - { - tracing::info!(node = %node_name, pin = %pn, "Received OutputHintChannel from engine"); - hint_receivers.push((pn.clone(), hint_rx)); + // Non-blocking drain of pending control messages. + let mut shutdown_requested = false; + while let Ok(ctrl) = context.control_rx.try_recv() { + match ctrl { + NodeControlMessage::Shutdown => { + tracing::info!(node = %node_name, "Source plugin received shutdown"); + if let Err(e) = context + .state_tx + .send(NodeStateUpdate::new( + node_name.clone(), + NodeState::Stopped { reason: StopReason::Completed }, + )) + .await + { + warn!(error = %e, node = %node_name, "Failed to send stopped state"); + } + final_state_emitted = true; + shutdown_requested = true; + break; + }, + NodeControlMessage::UpdateParams(params_value) => { + self.apply_params_update( + &node_name, + ¶ms_value, + worker_tx, + &context.state_tx, + Some(&telemetry), + ) + .await?; + }, + NodeControlMessage::Start => { + // Already started — ignore duplicate. + }, + } + } + if shutdown_requested { + break; + } + + // Non-blocking drain of pin management messages to pick up + // OutputHintChannel deliveries from the engine. + if let Some(ref mut pin_mgmt_rx) = context.pin_management_rx { + while let Ok(msg) = pin_mgmt_rx.try_recv() { + if let streamkit_core::pins::PinManagementMessage::OutputHintChannel { + pin_name: ref pn, + hint_rx, + } = msg + { + tracing::info!(node = %node_name, pin = %pn, "Received OutputHintChannel from engine"); + hint_receivers.push((pn.clone(), hint_rx)); + } } } - } - // Drain all hint receivers and deliver to plugin via the worker. - if !hint_receivers.is_empty() && self.state.api().on_upstream_hint.is_some() { - let mut pending_hints: Vec = Vec::new(); - hint_receivers.retain_mut(|(_pin, rx)| loop { + // Drain all hint receivers and deliver to plugin via the worker. + if !hint_receivers.is_empty() && self.state.api().on_upstream_hint.is_some() { + let mut pending_hints: Vec = Vec::new(); + hint_receivers.retain_mut(|(_pin, rx)| loop { match rx.try_recv() { Ok(hint) => { tracing::info!(node = %node_name, ?hint, "Delivering upstream hint to plugin"); @@ -1914,121 +1946,126 @@ impl NativeNodeWrapper { }, } }); - if !pending_hints.is_empty() { - let (hint_reply_tx, _hint_reply_rx) = tokio::sync::oneshot::channel(); - // Use try_send: if the worker is busy (channel full), drop - // the hints rather than blocking. This prevents a wedged - // on_upstream_hint from stalling the tick loop — the - // capacity-1 channel would otherwise block the send until - // the worker drains the previous request. - match worker_tx.try_send(WorkerRequest::OnUpstreamHint { - hints: pending_hints, - reply: hint_reply_tx, - }) { - Ok(()) => { - // Hint enqueued; we don't await the reply — the - // worker will process it before the next Tick. - }, - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - warn!(node = %node_name, "Dropping upstream hints: worker busy"); - }, - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - warn!(node = %node_name, "Dropping upstream hints: worker died"); - }, + if !pending_hints.is_empty() { + let (hint_reply_tx, _hint_reply_rx) = tokio::sync::oneshot::channel(); + // Use try_send: if the worker is busy (channel full), drop + // the hints rather than blocking. This prevents a wedged + // on_upstream_hint from stalling the tick loop — the + // capacity-1 channel would otherwise block the send until + // the worker drains the previous request. + match worker_tx.try_send(WorkerRequest::OnUpstreamHint { + hints: pending_hints, + reply: hint_reply_tx, + }) { + Ok(()) => { + // Hint enqueued; we don't await the reply — the + // worker will process it before the next Tick. + }, + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + warn!(node = %node_name, "Dropping upstream hints: worker busy"); + }, + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + warn!(node = %node_name, "Dropping upstream hints: worker died"); + }, + } } } - } - let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); - self.send_to_worker( - WorkerCallContext { - op: "tick", - node: &node_name, - state_tx: Some(&context.state_tx), - telemetry: Some(&telemetry), - metric_labels: &self.state.labels_tick, - }, - worker_tx, - WorkerRequest::Tick { reply: reply_tx }, - ) - .await?; - let reply = self - .await_reply( - "tick", - &node_name, - reply_rx, - Some(&context.state_tx), - Some(&telemetry), - &self.state.labels_tick, + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + self.send_to_worker( + WorkerCallContext { + op: "tick", + node: &node_name, + state_tx: Some(&context.state_tx), + telemetry: Some(&telemetry), + metric_labels: &self.state.labels_tick, + }, + worker_tx, + WorkerRequest::Tick { reply: reply_tx }, ) .await?; - - // Send outputs produced by tick. If the output channel is closed, - // stop ticking — source nodes have no input-close backstop so we must - // detect consumer disconnect here. - let mut output_closed = false; - for (pin, pkt) in reply.outputs { - if context.output_sender.send(&pin, pkt).await.is_err() { - tracing::debug!(node = %node_name, "Output channel closed during tick"); - output_closed = true; + let reply = self + .await_reply( + "tick", + &node_name, + reply_rx, + Some(&context.state_tx), + Some(&telemetry), + &self.state.labels_tick, + ) + .await?; + + // Send outputs produced by tick. If the output channel is closed, + // stop ticking — source nodes have no input-close backstop so we must + // detect consumer disconnect here. + let mut output_closed = false; + for (pin, pkt) in reply.outputs { + if context.output_sender.send(&pin, pkt).await.is_err() { + tracing::debug!(node = %node_name, "Output channel closed during tick"); + output_closed = true; + break; + } + } + if output_closed { break; } - } - if output_closed { - break; - } - tick_count += 1; + tick_count += 1; - // Check tick result - if let Some(error_msg) = reply.error { - error!(node = %node_name, error = %error_msg, "Source tick error"); - if let Err(e) = context - .state_tx - .send(NodeStateUpdate::new( - node_name.clone(), - NodeState::Failed { reason: error_msg.clone() }, - )) - .await - { - warn!(error = %e, node = %node_name, "Failed to send failed state"); + // Check tick result + if let Some(error_msg) = reply.error { + error!(node = %node_name, error = %error_msg, "Source tick error"); + if let Err(e) = context + .state_tx + .send(NodeStateUpdate::new( + node_name.clone(), + NodeState::Failed { reason: error_msg.clone() }, + )) + .await + { + warn!(error = %e, node = %node_name, "Failed to send failed state"); + } + final_state_emitted = true; + tick_result = Err(StreamKitError::Runtime(error_msg)); + break; } - return Err(StreamKitError::Runtime(error_msg)); - } - if reply.done { - tracing::info!(node = %node_name, ticks = tick_count, "Source signalled done"); - break; - } + if reply.done { + tracing::info!(node = %node_name, ticks = tick_count, "Source signalled done"); + break; + } - // Wait for next tick — cancellation-aware so shutdown is responsive. - tokio::select! { - biased; - () = async { - match &context.cancellation_token { - Some(token) => token.cancelled().await, - None => std::future::pending().await, + // Wait for next tick — cancellation-aware so shutdown is responsive. + tokio::select! { + biased; + () = async { + match &context.cancellation_token { + Some(token) => token.cancelled().await, + None => std::future::pending().await, + } + } => { + tracing::info!(node = %node_name, "Source plugin cancelled during tick wait"); + break; } - } => { - tracing::info!(node = %node_name, "Source plugin cancelled during tick wait"); - break; + _ = interval.tick() => {} } - _ = interval.tick() => {} } - } + } // end borrow scope for worker_tx - // Emit stopped state - if let Err(e) = context - .state_tx - .send(NodeStateUpdate::new( - node_name.clone(), - NodeState::Stopped { reason: StopReason::Completed }, - )) - .await - { - warn!(error = %e, node = %node_name, "Failed to send stopped state"); + if !final_state_emitted { + if let Err(e) = context + .state_tx + .send(NodeStateUpdate::new( + node_name.clone(), + NodeState::Stopped { reason: StopReason::Completed }, + )) + .await + { + warn!(error = %e, node = %node_name, "Failed to send stopped state"); + } } - Ok(()) + worker.shutdown().await; + tick_result } /// Helper to apply a parameter update via the worker thread. diff --git a/crates/plugin-native/tests/source_plugin.rs b/crates/plugin-native/tests/source_plugin.rs index 05c5c16d..fe17fde6 100644 --- a/crates/plugin-native/tests/source_plugin.rs +++ b/crates/plugin-native/tests/source_plugin.rs @@ -95,21 +95,6 @@ async fn await_state_matching bool>( } } -/// Briefly yield to let the detached worker thread drain its channel and -/// run `InstanceState::drop` (which calls back into the loaded `.so` via -/// `destroy_instance`) before the test subprocess starts unloading the -/// dlopen'd library. -/// -/// Under coverage instrumentation the first call into a freshly-loaded -/// `.so` is noticeably slower, exposing a teardown race in -/// `NativeNodeWrapper::run_source`: `run()` returns without joining or -/// signalling the worker, so the detached worker can still be inside the -/// `.so` when the subprocess exits. Tracked as a follow-up issue (see -/// the PR description). -async fn drain_detached_worker() { - tokio::time::sleep(std::time::Duration::from_millis(200)).await; -} - #[tokio::test] async fn source_plugin_metadata_reports_is_source_after_probe() { let plugin = load_source_plugin(); @@ -187,7 +172,6 @@ async fn source_plugin_ticks_and_emits_then_completes_after_max_ticks() { run_result.is_ok(), "source plugin run loop must exit Ok after tick returns completion signal, got: {run_result:?}" ); - drain_detached_worker().await; } #[tokio::test] @@ -230,7 +214,6 @@ async fn source_plugin_tick_error_marks_node_failed_with_plugin_message() { run_result.is_err(), "source plugin run loop must return Err after tick error, got: {run_result:?}" ); - drain_detached_worker().await; } #[tokio::test] @@ -264,7 +247,6 @@ async fn source_plugin_shutdown_control_terminates_cleanly_before_max_ticks() { run_result.is_ok(), "shutdown control must produce Ok exit even mid-stream, got: {run_result:?}" ); - drain_detached_worker().await; } #[tokio::test] @@ -291,7 +273,6 @@ async fn source_plugin_shutdown_before_start_exits_cleanly() { run_result.is_ok(), "Shutdown received before Start must yield clean Ok exit, got: {run_result:?}" ); - drain_detached_worker().await; } #[tokio::test] @@ -318,7 +299,6 @@ async fn source_plugin_control_channel_close_before_start_exits_cleanly() { run_result.is_ok(), "control channel close before Start must yield clean Ok exit, got: {run_result:?}" ); - drain_detached_worker().await; } /// Pins the host contract that an `UpdateParams` arriving in the @@ -372,5 +352,4 @@ async fn source_plugin_update_params_before_start_is_accepted_without_failing_th run_result.is_ok(), "UpdateParams before Start must not fail the node, got: {run_result:?}" ); - drain_detached_worker().await; }