From e2ae9fb393714c55a13f22a35c6f937c5a4f4afe Mon Sep 17 00:00:00 2001 From: nomaterials Date: Wed, 3 Jun 2026 14:19:08 +0200 Subject: [PATCH 1/3] execution_graph: preserve unrun dirty work on dispatch error Planning drains and clears the entire in-scope dirty set up front to build the schedule, but InlineDispatcher executes fail-fast: when a node trapped or errored, every node scheduled after it lost its (already drained) dirty mark. A retry then planned from an empty set and reported "nothing to do" while the pending work was silently dropped. On error, re-mark the failed node and the unrun tail dirty (remark_scheduled_dirty) so the work is recoverable on the next run. Applies to both run_all (global) and run_node (closure) paths. Add three regression tests and document the fail-fast recovery contract on run_all/run_node. --- execution_graph/src/dispatch.rs | 22 +++- execution_graph/src/graph.rs | 200 +++++++++++++++++++++++++++++--- 2 files changed, 201 insertions(+), 21 deletions(-) diff --git a/execution_graph/src/dispatch.rs b/execution_graph/src/dispatch.rs index ac17d1b..1a8cf43 100644 --- a/execution_graph/src/dispatch.rs +++ b/execution_graph/src/dispatch.rs @@ -59,9 +59,16 @@ impl Dispatcher for InlineDispatcher { } let mut to_run: Vec = plan.take_nodes(); - for node in to_run.drain(..) { - graph.execute_scheduled_node(node)?; + for i in 0..to_run.len() { + if let Err(e) = graph.execute_scheduled_node(to_run[i]) { + // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Their dirty marks + // were cleared when the plan was drained, so re-mark them to keep that pending + // work recoverable on the next run instead of silently dropping it. + graph.remark_scheduled_dirty(&to_run[i..]); + return Err(e); + } } + to_run.clear(); Ok(to_run) } @@ -80,8 +87,14 @@ impl Dispatcher for InlineDispatcher { let mut report = RunDetailReport::default(); let mut to_run: Vec = plan.take_nodes(); - for node in to_run.drain(..) { - graph.execute_scheduled_node(node)?; + for i in 0..to_run.len() { + let node = to_run[i]; + if let Err(e) = graph.execute_scheduled_node(node) { + // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Re-mark them so + // their drained dirty state is not silently lost (see `dispatch`). + graph.remark_scheduled_dirty(&to_run[i..]); + return Err(e); + } if let Some(t) = trace.as_mut() && let Some(r) = t.take_report_for(node) { @@ -89,6 +102,7 @@ impl Dispatcher for InlineDispatcher { } } + to_run.clear(); Ok((to_run, report)) } } diff --git a/execution_graph/src/graph.rs b/execution_graph/src/graph.rs index 38f6e6e..a4c281f 100644 --- a/execution_graph/src/graph.rs +++ b/execution_graph/src/graph.rs @@ -898,6 +898,10 @@ impl ExecutionGraph { } /// Runs all currently dirty work in dependency order and returns a cheap summary. + /// + /// Execution is fail-fast: if a node errors, the run stops and returns that error, but the + /// dirty state of any not-yet-executed scheduled work is preserved so a subsequent run + /// re-attempts it. pub fn run_all(&mut self) -> Result { let plan = self.plan_all(); self.run_plan(plan) @@ -919,6 +923,9 @@ impl ExecutionGraph { /// /// This drains only dirty keys that are within the dependency closure of `node`'s outputs. /// Unrelated dirty work remains dirty and is not drained. + /// + /// Execution is fail-fast: if a node errors, the run stops and returns that error, but the + /// dirty state of not-yet-executed work in the closure is preserved for a subsequent run. pub fn run_node(&mut self, node: NodeId) -> Result { let plan = self.plan_within_dependencies_of(node)?; self.run_plan(plan) @@ -943,6 +950,27 @@ impl ExecutionGraph { self.run_node_internal(node) } + /// Internal dispatch hook: re-marks the output keys of `nodes` dirty. + /// + /// Planning drains (and clears) the scheduled dirty set up front, so when dispatch stops + /// fail-fast on an error the un-run nodes would otherwise be left permanently clean and their + /// pending work silently dropped. Re-marking their outputs keeps that work recoverable on the + /// next run. + #[inline] + pub(crate) fn remark_scheduled_dirty(&mut self, nodes: &[NodeId]) { + for &node in nodes { + let Ok(index) = usize::try_from(node.as_u64()) else { + continue; + }; + if index >= self.nodes.len() { + continue; + } + for &out_id in self.nodes[index].output_ids.iter() { + self.dirty.mark_dirty(out_id); + } + } + } + fn execute_kind( node: NodeId, kind: &mut NodeKind, @@ -1173,6 +1201,46 @@ mod tests { } } + /// A no-input program that traps at runtime (divide-by-zero). + fn trap_program() -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.const_i64(1, 1); + a.const_i64(2, 0); + a.i64_div(3, 1, 2); + a.ret(0, &[3]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + + /// A no-input program returning the constant `v` as output "value". + fn const_program(v: i64) -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.const_i64(1, v); + a.ret(0, &[1]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + #[test] fn graph_error_display_includes_actionable_context() { let bad_entry = GraphError::BadEntryFunc { func: FuncId(99) }.to_string(); @@ -1603,23 +1671,7 @@ mod tests { #[test] fn run_all_preserves_vm_trap_info() { - let mut pb = ProgramBuilder::new(); - let mut a = Asm::new(); - a.const_i64(1, 1); - a.const_i64(2, 0); - a.i64_div(3, 1, 2); - a.ret(0, &[3]); - let f = pb - .push_function_checked( - a, - FunctionSig { - arg_types: vec![], - ret_types: vec![ValueType::I64], - }, - ) - .unwrap(); - pb.set_function_output_name(f, 0, "value").unwrap(); - let prog = Arc::new(pb.build_verified().unwrap()); + let (prog, f) = trap_program(); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); let n = g.add_node(prog, f, vec![]).unwrap(); @@ -1632,6 +1684,120 @@ mod tests { assert_eq!(trap.trap, Trap::DivByZero); } + #[test] + fn run_all_trap_keeps_independent_node_recoverable() { + // node0 traps; node1 is an independent constant scheduled after it. A trap mid-pass must + // not silently discard node1's pending dirty work. + let (trap_prog, trap_f) = trap_program(); + let (const_prog, const_f) = const_program(42); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node1 = g.add_node(const_prog, const_f, vec![]).unwrap(); + + // node0 is scheduled first and traps; fail-fast leaves node1 unrun. + assert!(matches!(g.run_all(), Err(GraphError::Trap { .. }))); + assert_eq!(g.node_run_count(node0), Some(0)); + assert_eq!( + g.node_run_count(node1), + Some(0), + "precondition: node1 must not have run in the trapping pass" + ); + + // node1's dirty state survived: a targeted re-run executes it and produces its value. + let summary = g.run_node(node1).unwrap(); + assert_eq!(summary.executed_nodes, 1); + assert_eq!( + g.node_outputs(node1).and_then(|o| o.get("value")), + Some(&Value::I64(42)) + ); + } + + #[test] + fn run_all_trap_does_not_remark_already_executed_nodes() { + // Scheduled order is [node_a, node0, node_c]: node_a runs, node0 traps, node_c is unrun. + // The fix must re-mark only the failed node and the unrun tail, never the node that + // already executed successfully. + let (a_prog, a_f) = const_program(7); + let (trap_prog, trap_f) = trap_program(); + let (c_prog, c_f) = const_program(99); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node_a = g.add_node(a_prog, a_f, vec![]).unwrap(); + let _node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node_c = g.add_node(c_prog, c_f, vec![]).unwrap(); + + assert!(matches!(g.run_all(), Err(GraphError::Trap { .. }))); + assert_eq!( + g.node_run_count(node_a), + Some(1), + "precondition: node_a must have executed before the trap" + ); + + // node_a already ran and must not have been re-marked, so a targeted re-run is a no-op. + assert_eq!(g.run_node(node_a).unwrap().executed_nodes, 0); + assert_eq!(g.node_run_count(node_a), Some(1)); + + // node_c was unrun and re-marked, so it remains recoverable. + assert_eq!(g.run_node(node_c).unwrap().executed_nodes, 1); + assert_eq!( + g.node_outputs(node_c).and_then(|o| o.get("value")), + Some(&Value::I64(99)) + ); + } + + #[test] + fn run_node_trap_keeps_closure_sibling_recoverable() { + // target depends on node0 (traps) and node_sib (independent constant). Running target's + // closure traps on node0; node_sib must remain recoverable rather than being dropped. + fn passthrough2_program() -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.i64_add(3, 1, 2); + a.ret(0, &[3]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![ValueType::I64, ValueType::I64], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + + let (trap_prog, trap_f) = trap_program(); + let (sib_prog, sib_f) = const_program(42); + let (tgt_prog, tgt_f) = passthrough2_program(); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node_sib = g.add_node(sib_prog, sib_f, vec![]).unwrap(); + let target = g + .add_node(tgt_prog, tgt_f, vec!["x".into(), "y".into()]) + .unwrap(); + g.connect(node0, "value", target, "x").unwrap(); + g.connect(node_sib, "value", target, "y").unwrap(); + + // node0 is scheduled before node_sib inside target's closure and traps. + assert!(matches!(g.run_node(target), Err(GraphError::Trap { .. }))); + assert_eq!( + g.node_run_count(node_sib), + Some(0), + "precondition: node_sib must not have run in the trapping pass" + ); + + // node_sib's dirty state survived the closure trap and re-runs cleanly. + let summary = g.run_node(node_sib).unwrap(); + assert_eq!(summary.executed_nodes, 1); + assert_eq!( + g.node_outputs(node_sib).and_then(|o| o.get("value")), + Some(&Value::I64(42)) + ); + } + #[test] fn graph_builder_errors_on_bad_entry_func() { let mut pb = ProgramBuilder::new(); From 47308464e9dbd5335dd1de11383acb53bf185648 Mon Sep 17 00:00:00 2001 From: nomaterials Date: Wed, 3 Jun 2026 16:50:01 +0200 Subject: [PATCH 2/3] execution_graph: reclaim schedule buffer on every dispatch path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The InlineDispatcher returned the drained schedule buffer for the caller to stash back into scratch, but only on the Ok arm — on the fail-fast error path the buffer was dropped, so the next planning pass reallocated. Have the dispatcher reclaim the buffer itself via a new reclaim_schedule_buffer hook on every exit path (success and error). dispatch now returns Result<(), GraphError> and dispatch_with_report returns Result — the buffer no longer rides on the return type and can't be lost on error. Dispatcher is pub(crate), so no public API change. --- execution_graph/src/dispatch.rs | 35 +++++++++++++++++++-------------- execution_graph/src/graph.rs | 19 +++++++++++------- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/execution_graph/src/dispatch.rs b/execution_graph/src/dispatch.rs index 1a8cf43..18bd497 100644 --- a/execution_graph/src/dispatch.rs +++ b/execution_graph/src/dispatch.rs @@ -22,21 +22,22 @@ use crate::report::RunDetailReport; pub(crate) trait Dispatcher { /// Executes `plan` without producing traced reporting. /// - /// Returns the drained scheduling buffer so callers can reuse its capacity. + /// The drained scheduling buffer is returned to the graph's scratch workspace (for capacity + /// reuse on the next planning pass) on every exit path, success or error. fn dispatch( &mut self, graph: &mut ExecutionGraph, plan: RunPlan, - ) -> Result, GraphError>; + ) -> Result<(), GraphError>; /// Executes `plan` and returns traced reporting if available. /// - /// Returns both the drained scheduling buffer (for capacity reuse) and the assembled report. + /// Like [`Dispatcher::dispatch`], the scheduling buffer is reclaimed on every exit path. fn dispatch_with_report( &mut self, graph: &mut ExecutionGraph, plan: RunPlan, - ) -> Result<(Vec, RunDetailReport), GraphError>; + ) -> Result; } /// Serial in-thread dispatcher used by default. @@ -52,24 +53,25 @@ impl Dispatcher for InlineDispatcher { &mut self, graph: &mut ExecutionGraph, mut plan: RunPlan, - ) -> Result, GraphError> { + ) -> Result<(), GraphError> { // Keep scope as part of the dispatch contract even before scope-specific strategies exist. match plan.scope() { PlanScope::All | PlanScope::WithinDependenciesOf(_) => {} } - let mut to_run: Vec = plan.take_nodes(); + let to_run: Vec = plan.take_nodes(); for i in 0..to_run.len() { if let Err(e) = graph.execute_scheduled_node(to_run[i]) { // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Their dirty marks // were cleared when the plan was drained, so re-mark them to keep that pending // work recoverable on the next run instead of silently dropping it. graph.remark_scheduled_dirty(&to_run[i..]); + graph.reclaim_schedule_buffer(to_run); return Err(e); } } - to_run.clear(); - Ok(to_run) + graph.reclaim_schedule_buffer(to_run); + Ok(()) } #[inline] @@ -77,7 +79,7 @@ impl Dispatcher for InlineDispatcher { &mut self, graph: &mut ExecutionGraph, mut plan: RunPlan, - ) -> Result<(Vec, RunDetailReport), GraphError> { + ) -> Result { // Keep scope as part of the dispatch contract even before scope-specific strategies exist. match plan.scope() { PlanScope::All | PlanScope::WithinDependenciesOf(_) => {} @@ -85,14 +87,17 @@ impl Dispatcher for InlineDispatcher { let mut trace = plan.take_trace(); let mut report = RunDetailReport::default(); - let mut to_run: Vec = plan.take_nodes(); + let to_run: Vec = plan.take_nodes(); for i in 0..to_run.len() { let node = to_run[i]; if let Err(e) = graph.execute_scheduled_node(node) { // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Re-mark them so - // their drained dirty state is not silently lost (see `dispatch`). + // their drained dirty state is not silently lost (see `dispatch`). NOTE: the + // partial report accumulated so far is dropped here; surfacing it on error is a + // follow-up. graph.remark_scheduled_dirty(&to_run[i..]); + graph.reclaim_schedule_buffer(to_run); return Err(e); } if let Some(t) = trace.as_mut() @@ -102,8 +107,8 @@ impl Dispatcher for InlineDispatcher { } } - to_run.clear(); - Ok((to_run, report)) + graph.reclaim_schedule_buffer(to_run); + Ok(report) } } @@ -236,7 +241,7 @@ mod tests { let plan = RunPlan::all(vec![n1, n0]).with_trace(RunPlanTrace::from_node_reports(node_reports)); let mut dispatcher = InlineDispatcher; - let (_buf, report) = dispatcher + let report = dispatcher .dispatch_with_report(&mut g, plan) .expect("dispatch should succeed"); @@ -255,7 +260,7 @@ mod tests { let trace = RunPlanTrace::from_node_reports(vec![]); let mut dispatcher = InlineDispatcher; - let (_buf, out) = dispatcher + let out = dispatcher .dispatch_with_report(&mut g, RunPlan::all(vec![node]).with_trace(trace)) .expect("dispatch should succeed"); diff --git a/execution_graph/src/graph.rs b/execution_graph/src/graph.rs index a4c281f..5e62f9f 100644 --- a/execution_graph/src/graph.rs +++ b/execution_graph/src/graph.rs @@ -881,9 +881,7 @@ impl ExecutionGraph { fn run_plan(&mut self, plan: RunPlan) -> Result { let executed_nodes = plan.node_count(); let mut dispatcher = InlineDispatcher; - let to_run = dispatcher.dispatch(self, plan)?; - // Reclaim the drained schedule buffer to reuse its capacity on the next planning pass. - self.scratch.to_run = to_run; + dispatcher.dispatch(self, plan)?; Ok(RunSummary { executed_nodes }) } @@ -891,10 +889,7 @@ impl ExecutionGraph { #[inline] fn run_plan_with_report(&mut self, plan: RunPlan) -> Result { let mut dispatcher = InlineDispatcher; - let (to_run, report) = dispatcher.dispatch_with_report(self, plan)?; - // Reclaim the drained schedule buffer to reuse its capacity on the next planning pass. - self.scratch.to_run = to_run; - Ok(report) + dispatcher.dispatch_with_report(self, plan) } /// Runs all currently dirty work in dependency order and returns a cheap summary. @@ -971,6 +966,16 @@ impl ExecutionGraph { } } + /// Internal dispatch hook: returns a spent scheduling buffer to the scratch workspace. + /// + /// Dispatch takes the schedule out of the plan to execute it; handing the (cleared) buffer + /// back here on every exit path lets the next planning pass reuse its capacity. + #[inline] + pub(crate) fn reclaim_schedule_buffer(&mut self, mut buf: Vec) { + buf.clear(); + self.scratch.to_run = buf; + } + fn execute_kind( node: NodeId, kind: &mut NodeKind, From 7cdb29e7f3b5eeba56940b07766bd87dfa8bdb16 Mon Sep 17 00:00:00 2001 From: nomaterials Date: Wed, 3 Jun 2026 16:56:34 +0200 Subject: [PATCH 3/3] fmt --- execution_graph/src/dispatch.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/execution_graph/src/dispatch.rs b/execution_graph/src/dispatch.rs index 18bd497..8e1a183 100644 --- a/execution_graph/src/dispatch.rs +++ b/execution_graph/src/dispatch.rs @@ -24,11 +24,7 @@ pub(crate) trait Dispatcher { /// /// The drained scheduling buffer is returned to the graph's scratch workspace (for capacity /// reuse on the next planning pass) on every exit path, success or error. - fn dispatch( - &mut self, - graph: &mut ExecutionGraph, - plan: RunPlan, - ) -> Result<(), GraphError>; + fn dispatch(&mut self, graph: &mut ExecutionGraph, plan: RunPlan) -> Result<(), GraphError>; /// Executes `plan` and returns traced reporting if available. ///