diff --git a/execution_graph/src/dispatch.rs b/execution_graph/src/dispatch.rs index ac17d1b..8e1a183 100644 --- a/execution_graph/src/dispatch.rs +++ b/execution_graph/src/dispatch.rs @@ -22,21 +22,18 @@ use crate::report::RunDetailReport; pub(crate) trait Dispatcher { /// Executes `plan` without producing traced reporting. /// - /// Returns the drained scheduling buffer so callers can reuse its capacity. - fn dispatch( - &mut self, - graph: &mut ExecutionGraph, - plan: RunPlan, - ) -> Result, GraphError>; + /// The drained scheduling buffer is returned to the graph's scratch workspace (for capacity + /// reuse on the next planning pass) on every exit path, success or error. + fn dispatch(&mut self, graph: &mut ExecutionGraph, plan: RunPlan) -> Result<(), GraphError>; /// Executes `plan` and returns traced reporting if available. /// - /// Returns both the drained scheduling buffer (for capacity reuse) and the assembled report. + /// Like [`Dispatcher::dispatch`], the scheduling buffer is reclaimed on every exit path. fn dispatch_with_report( &mut self, graph: &mut ExecutionGraph, plan: RunPlan, - ) -> Result<(Vec, RunDetailReport), GraphError>; + ) -> Result; } /// Serial in-thread dispatcher used by default. @@ -52,17 +49,25 @@ impl Dispatcher for InlineDispatcher { &mut self, graph: &mut ExecutionGraph, mut plan: RunPlan, - ) -> Result, GraphError> { + ) -> Result<(), GraphError> { // Keep scope as part of the dispatch contract even before scope-specific strategies exist. match plan.scope() { PlanScope::All | PlanScope::WithinDependenciesOf(_) => {} } - let mut to_run: Vec = plan.take_nodes(); - for node in to_run.drain(..) { - graph.execute_scheduled_node(node)?; + let to_run: Vec = plan.take_nodes(); + for i in 0..to_run.len() { + if let Err(e) = graph.execute_scheduled_node(to_run[i]) { + // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Their dirty marks + // were cleared when the plan was drained, so re-mark them to keep that pending + // work recoverable on the next run instead of silently dropping it. + graph.remark_scheduled_dirty(&to_run[i..]); + graph.reclaim_schedule_buffer(to_run); + return Err(e); + } } - Ok(to_run) + graph.reclaim_schedule_buffer(to_run); + Ok(()) } #[inline] @@ -70,7 +75,7 @@ impl Dispatcher for InlineDispatcher { &mut self, graph: &mut ExecutionGraph, mut plan: RunPlan, - ) -> Result<(Vec, RunDetailReport), GraphError> { + ) -> Result { // Keep scope as part of the dispatch contract even before scope-specific strategies exist. match plan.scope() { PlanScope::All | PlanScope::WithinDependenciesOf(_) => {} @@ -78,10 +83,19 @@ impl Dispatcher for InlineDispatcher { let mut trace = plan.take_trace(); let mut report = RunDetailReport::default(); - let mut to_run: Vec = plan.take_nodes(); - - for node in to_run.drain(..) { - graph.execute_scheduled_node(node)?; + let to_run: Vec = plan.take_nodes(); + + for i in 0..to_run.len() { + let node = to_run[i]; + if let Err(e) = graph.execute_scheduled_node(node) { + // Fail-fast: this node errored and `to_run[i + 1..]` never ran. Re-mark them so + // their drained dirty state is not silently lost (see `dispatch`). NOTE: the + // partial report accumulated so far is dropped here; surfacing it on error is a + // follow-up. + graph.remark_scheduled_dirty(&to_run[i..]); + graph.reclaim_schedule_buffer(to_run); + return Err(e); + } if let Some(t) = trace.as_mut() && let Some(r) = t.take_report_for(node) { @@ -89,7 +103,8 @@ impl Dispatcher for InlineDispatcher { } } - Ok((to_run, report)) + graph.reclaim_schedule_buffer(to_run); + Ok(report) } } @@ -222,7 +237,7 @@ mod tests { let plan = RunPlan::all(vec![n1, n0]).with_trace(RunPlanTrace::from_node_reports(node_reports)); let mut dispatcher = InlineDispatcher; - let (_buf, report) = dispatcher + let report = dispatcher .dispatch_with_report(&mut g, plan) .expect("dispatch should succeed"); @@ -241,7 +256,7 @@ mod tests { let trace = RunPlanTrace::from_node_reports(vec![]); let mut dispatcher = InlineDispatcher; - let (_buf, out) = dispatcher + let out = dispatcher .dispatch_with_report(&mut g, RunPlan::all(vec![node]).with_trace(trace)) .expect("dispatch should succeed"); diff --git a/execution_graph/src/graph.rs b/execution_graph/src/graph.rs index 38f6e6e..5e62f9f 100644 --- a/execution_graph/src/graph.rs +++ b/execution_graph/src/graph.rs @@ -881,9 +881,7 @@ impl ExecutionGraph { fn run_plan(&mut self, plan: RunPlan) -> Result { let executed_nodes = plan.node_count(); let mut dispatcher = InlineDispatcher; - let to_run = dispatcher.dispatch(self, plan)?; - // Reclaim the drained schedule buffer to reuse its capacity on the next planning pass. - self.scratch.to_run = to_run; + dispatcher.dispatch(self, plan)?; Ok(RunSummary { executed_nodes }) } @@ -891,13 +889,14 @@ impl ExecutionGraph { #[inline] fn run_plan_with_report(&mut self, plan: RunPlan) -> Result { let mut dispatcher = InlineDispatcher; - let (to_run, report) = dispatcher.dispatch_with_report(self, plan)?; - // Reclaim the drained schedule buffer to reuse its capacity on the next planning pass. - self.scratch.to_run = to_run; - Ok(report) + dispatcher.dispatch_with_report(self, plan) } /// Runs all currently dirty work in dependency order and returns a cheap summary. + /// + /// Execution is fail-fast: if a node errors, the run stops and returns that error, but the + /// dirty state of any not-yet-executed scheduled work is preserved so a subsequent run + /// re-attempts it. pub fn run_all(&mut self) -> Result { let plan = self.plan_all(); self.run_plan(plan) @@ -919,6 +918,9 @@ impl ExecutionGraph { /// /// This drains only dirty keys that are within the dependency closure of `node`'s outputs. /// Unrelated dirty work remains dirty and is not drained. + /// + /// Execution is fail-fast: if a node errors, the run stops and returns that error, but the + /// dirty state of not-yet-executed work in the closure is preserved for a subsequent run. pub fn run_node(&mut self, node: NodeId) -> Result { let plan = self.plan_within_dependencies_of(node)?; self.run_plan(plan) @@ -943,6 +945,37 @@ impl ExecutionGraph { self.run_node_internal(node) } + /// Internal dispatch hook: re-marks the output keys of `nodes` dirty. + /// + /// Planning drains (and clears) the scheduled dirty set up front, so when dispatch stops + /// fail-fast on an error the un-run nodes would otherwise be left permanently clean and their + /// pending work silently dropped. Re-marking their outputs keeps that work recoverable on the + /// next run. + #[inline] + pub(crate) fn remark_scheduled_dirty(&mut self, nodes: &[NodeId]) { + for &node in nodes { + let Ok(index) = usize::try_from(node.as_u64()) else { + continue; + }; + if index >= self.nodes.len() { + continue; + } + for &out_id in self.nodes[index].output_ids.iter() { + self.dirty.mark_dirty(out_id); + } + } + } + + /// Internal dispatch hook: returns a spent scheduling buffer to the scratch workspace. + /// + /// Dispatch takes the schedule out of the plan to execute it; handing the (cleared) buffer + /// back here on every exit path lets the next planning pass reuse its capacity. + #[inline] + pub(crate) fn reclaim_schedule_buffer(&mut self, mut buf: Vec) { + buf.clear(); + self.scratch.to_run = buf; + } + fn execute_kind( node: NodeId, kind: &mut NodeKind, @@ -1173,6 +1206,46 @@ mod tests { } } + /// A no-input program that traps at runtime (divide-by-zero). + fn trap_program() -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.const_i64(1, 1); + a.const_i64(2, 0); + a.i64_div(3, 1, 2); + a.ret(0, &[3]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + + /// A no-input program returning the constant `v` as output "value". + fn const_program(v: i64) -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.const_i64(1, v); + a.ret(0, &[1]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + #[test] fn graph_error_display_includes_actionable_context() { let bad_entry = GraphError::BadEntryFunc { func: FuncId(99) }.to_string(); @@ -1603,23 +1676,7 @@ mod tests { #[test] fn run_all_preserves_vm_trap_info() { - let mut pb = ProgramBuilder::new(); - let mut a = Asm::new(); - a.const_i64(1, 1); - a.const_i64(2, 0); - a.i64_div(3, 1, 2); - a.ret(0, &[3]); - let f = pb - .push_function_checked( - a, - FunctionSig { - arg_types: vec![], - ret_types: vec![ValueType::I64], - }, - ) - .unwrap(); - pb.set_function_output_name(f, 0, "value").unwrap(); - let prog = Arc::new(pb.build_verified().unwrap()); + let (prog, f) = trap_program(); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); let n = g.add_node(prog, f, vec![]).unwrap(); @@ -1632,6 +1689,120 @@ mod tests { assert_eq!(trap.trap, Trap::DivByZero); } + #[test] + fn run_all_trap_keeps_independent_node_recoverable() { + // node0 traps; node1 is an independent constant scheduled after it. A trap mid-pass must + // not silently discard node1's pending dirty work. + let (trap_prog, trap_f) = trap_program(); + let (const_prog, const_f) = const_program(42); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node1 = g.add_node(const_prog, const_f, vec![]).unwrap(); + + // node0 is scheduled first and traps; fail-fast leaves node1 unrun. + assert!(matches!(g.run_all(), Err(GraphError::Trap { .. }))); + assert_eq!(g.node_run_count(node0), Some(0)); + assert_eq!( + g.node_run_count(node1), + Some(0), + "precondition: node1 must not have run in the trapping pass" + ); + + // node1's dirty state survived: a targeted re-run executes it and produces its value. + let summary = g.run_node(node1).unwrap(); + assert_eq!(summary.executed_nodes, 1); + assert_eq!( + g.node_outputs(node1).and_then(|o| o.get("value")), + Some(&Value::I64(42)) + ); + } + + #[test] + fn run_all_trap_does_not_remark_already_executed_nodes() { + // Scheduled order is [node_a, node0, node_c]: node_a runs, node0 traps, node_c is unrun. + // The fix must re-mark only the failed node and the unrun tail, never the node that + // already executed successfully. + let (a_prog, a_f) = const_program(7); + let (trap_prog, trap_f) = trap_program(); + let (c_prog, c_f) = const_program(99); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node_a = g.add_node(a_prog, a_f, vec![]).unwrap(); + let _node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node_c = g.add_node(c_prog, c_f, vec![]).unwrap(); + + assert!(matches!(g.run_all(), Err(GraphError::Trap { .. }))); + assert_eq!( + g.node_run_count(node_a), + Some(1), + "precondition: node_a must have executed before the trap" + ); + + // node_a already ran and must not have been re-marked, so a targeted re-run is a no-op. + assert_eq!(g.run_node(node_a).unwrap().executed_nodes, 0); + assert_eq!(g.node_run_count(node_a), Some(1)); + + // node_c was unrun and re-marked, so it remains recoverable. + assert_eq!(g.run_node(node_c).unwrap().executed_nodes, 1); + assert_eq!( + g.node_outputs(node_c).and_then(|o| o.get("value")), + Some(&Value::I64(99)) + ); + } + + #[test] + fn run_node_trap_keeps_closure_sibling_recoverable() { + // target depends on node0 (traps) and node_sib (independent constant). Running target's + // closure traps on node0; node_sib must remain recoverable rather than being dropped. + fn passthrough2_program() -> (Arc, FuncId) { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.i64_add(3, 1, 2); + a.ret(0, &[3]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![ValueType::I64, ValueType::I64], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + (Arc::new(pb.build_verified().unwrap()), f) + } + + let (trap_prog, trap_f) = trap_program(); + let (sib_prog, sib_f) = const_program(42); + let (tgt_prog, tgt_f) = passthrough2_program(); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap(); + let node_sib = g.add_node(sib_prog, sib_f, vec![]).unwrap(); + let target = g + .add_node(tgt_prog, tgt_f, vec!["x".into(), "y".into()]) + .unwrap(); + g.connect(node0, "value", target, "x").unwrap(); + g.connect(node_sib, "value", target, "y").unwrap(); + + // node0 is scheduled before node_sib inside target's closure and traps. + assert!(matches!(g.run_node(target), Err(GraphError::Trap { .. }))); + assert_eq!( + g.node_run_count(node_sib), + Some(0), + "precondition: node_sib must not have run in the trapping pass" + ); + + // node_sib's dirty state survived the closure trap and re-runs cleanly. + let summary = g.run_node(node_sib).unwrap(); + assert_eq!(summary.executed_nodes, 1); + assert_eq!( + g.node_outputs(node_sib).and_then(|o| o.get("value")), + Some(&Value::I64(42)) + ); + } + #[test] fn graph_builder_errors_on_bad_entry_func() { let mut pb = ProgramBuilder::new();