Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 36 additions & 21 deletions execution_graph/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ use crate::report::RunDetailReport;
pub(crate) trait Dispatcher<H: Host> {
/// Executes `plan` without producing traced reporting.
///
/// Returns the drained scheduling buffer so callers can reuse its capacity.
fn dispatch(
&mut self,
graph: &mut ExecutionGraph<H>,
plan: RunPlan,
) -> Result<Vec<NodeId>, GraphError>;
/// The drained scheduling buffer is returned to the graph's scratch workspace (for capacity
/// reuse on the next planning pass) on every exit path, success or error.
fn dispatch(&mut self, graph: &mut ExecutionGraph<H>, plan: RunPlan) -> Result<(), GraphError>;

/// Executes `plan` and returns traced reporting if available.
///
/// Returns both the drained scheduling buffer (for capacity reuse) and the assembled report.
/// Like [`Dispatcher::dispatch`], the scheduling buffer is reclaimed on every exit path.
fn dispatch_with_report(
&mut self,
graph: &mut ExecutionGraph<H>,
plan: RunPlan,
) -> Result<(Vec<NodeId>, RunDetailReport), GraphError>;
) -> Result<RunDetailReport, GraphError>;
}

/// Serial in-thread dispatcher used by default.
Expand All @@ -52,44 +49,62 @@ impl<H: Host> Dispatcher<H> for InlineDispatcher {
&mut self,
graph: &mut ExecutionGraph<H>,
mut plan: RunPlan,
) -> Result<Vec<NodeId>, GraphError> {
) -> Result<(), GraphError> {
// Keep scope as part of the dispatch contract even before scope-specific strategies exist.
match plan.scope() {
PlanScope::All | PlanScope::WithinDependenciesOf(_) => {}
}

let mut to_run: Vec<NodeId> = plan.take_nodes();
for node in to_run.drain(..) {
graph.execute_scheduled_node(node)?;
let to_run: Vec<NodeId> = plan.take_nodes();
for i in 0..to_run.len() {
if let Err(e) = graph.execute_scheduled_node(to_run[i]) {
// Fail-fast: this node errored and `to_run[i + 1..]` never ran. Their dirty marks
// were cleared when the plan was drained, so re-mark them to keep that pending
// work recoverable on the next run instead of silently dropping it.
graph.remark_scheduled_dirty(&to_run[i..]);
graph.reclaim_schedule_buffer(to_run);
return Err(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, because of this error path, we aren't restoring the buffer for to_run back into the scratch workspace. Might not be that serious a thing, but maybe this is a sign we need a better API or design here overall.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True! I made it so the dispatcher now hands the buffer back to scratch itself via a reclaim_schedule_buffer hook on every exit path, rather than returning it for the caller to stash on the Ok arm only. Dispatcher is pub(crate), so no public API change - clean on that end.

}
}
Ok(to_run)
graph.reclaim_schedule_buffer(to_run);
Ok(())
}

#[inline]
fn dispatch_with_report(
&mut self,
graph: &mut ExecutionGraph<H>,
mut plan: RunPlan,
) -> Result<(Vec<NodeId>, RunDetailReport), GraphError> {
) -> Result<RunDetailReport, GraphError> {
// Keep scope as part of the dispatch contract even before scope-specific strategies exist.
match plan.scope() {
PlanScope::All | PlanScope::WithinDependenciesOf(_) => {}
}

let mut trace = plan.take_trace();
let mut report = RunDetailReport::default();
let mut to_run: Vec<NodeId> = plan.take_nodes();

for node in to_run.drain(..) {
graph.execute_scheduled_node(node)?;
let to_run: Vec<NodeId> = plan.take_nodes();

for i in 0..to_run.len() {
let node = to_run[i];
if let Err(e) = graph.execute_scheduled_node(node) {
// Fail-fast: this node errored and `to_run[i + 1..]` never ran. Re-mark them so
// their drained dirty state is not silently lost (see `dispatch`). NOTE: the
// partial report accumulated so far is dropped here; surfacing it on error is a
// follow-up.
graph.remark_scheduled_dirty(&to_run[i..]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the error gets returned here, the report that has been accumulated so far is dropped ... so the Report handling here could use improvement (but maybe as a follow up?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that this should be taken as a follow-up. Surfacing the partial report on error is an API decision (richer return, or an error variant carrying the partial RunDetailReport, etc).

Also it predates this PR: the old ?-based loop dropped it too.

For now I've left the behavior and added an inline note at the drop site.

graph.reclaim_schedule_buffer(to_run);
return Err(e);
}
if let Some(t) = trace.as_mut()
&& let Some(r) = t.take_report_for(node)
{
report.executed.push(r);
}
}

Ok((to_run, report))
graph.reclaim_schedule_buffer(to_run);
Ok(report)
}
}

Expand Down Expand Up @@ -222,7 +237,7 @@ mod tests {
let plan =
RunPlan::all(vec![n1, n0]).with_trace(RunPlanTrace::from_node_reports(node_reports));
let mut dispatcher = InlineDispatcher;
let (_buf, report) = dispatcher
let report = dispatcher
.dispatch_with_report(&mut g, plan)
.expect("dispatch should succeed");

Expand All @@ -241,7 +256,7 @@ mod tests {
let trace = RunPlanTrace::from_node_reports(vec![]);

let mut dispatcher = InlineDispatcher;
let (_buf, out) = dispatcher
let out = dispatcher
.dispatch_with_report(&mut g, RunPlan::all(vec![node]).with_trace(trace))
.expect("dispatch should succeed");

Expand Down
219 changes: 195 additions & 24 deletions execution_graph/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,23 +881,22 @@ impl<H: Host> ExecutionGraph<H> {
fn run_plan(&mut self, plan: RunPlan) -> Result<RunSummary, GraphError> {
let executed_nodes = plan.node_count();
let mut dispatcher = InlineDispatcher;
let to_run = dispatcher.dispatch(self, plan)?;
// Reclaim the drained schedule buffer to reuse its capacity on the next planning pass.
self.scratch.to_run = to_run;
dispatcher.dispatch(self, plan)?;
Ok(RunSummary { executed_nodes })
}

/// Executes a pre-built run plan and returns traced reporting data if attached.
#[inline]
fn run_plan_with_report(&mut self, plan: RunPlan) -> Result<RunDetailReport, GraphError> {
let mut dispatcher = InlineDispatcher;
let (to_run, report) = dispatcher.dispatch_with_report(self, plan)?;
// Reclaim the drained schedule buffer to reuse its capacity on the next planning pass.
self.scratch.to_run = to_run;
Ok(report)
dispatcher.dispatch_with_report(self, plan)
}

/// Runs all currently dirty work in dependency order and returns a cheap summary.
///
/// Execution is fail-fast: if a node errors, the run stops and returns that error, but the
/// dirty state of any not-yet-executed scheduled work is preserved so a subsequent run
/// re-attempts it.
pub fn run_all(&mut self) -> Result<RunSummary, GraphError> {
let plan = self.plan_all();
self.run_plan(plan)
Expand All @@ -919,6 +918,9 @@ impl<H: Host> ExecutionGraph<H> {
///
/// This drains only dirty keys that are within the dependency closure of `node`'s outputs.
/// Unrelated dirty work remains dirty and is not drained.
///
/// Execution is fail-fast: if a node errors, the run stops and returns that error, but the
/// dirty state of not-yet-executed work in the closure is preserved for a subsequent run.
pub fn run_node(&mut self, node: NodeId) -> Result<RunSummary, GraphError> {
let plan = self.plan_within_dependencies_of(node)?;
self.run_plan(plan)
Expand All @@ -943,6 +945,37 @@ impl<H: Host> ExecutionGraph<H> {
self.run_node_internal(node)
}

/// Internal dispatch hook: re-marks the output keys of `nodes` dirty.
///
/// Planning drains (and clears) the scheduled dirty set up front, so when dispatch stops
/// fail-fast on an error the un-run nodes would otherwise be left permanently clean and their
/// pending work silently dropped. Re-marking their outputs keeps that work recoverable on the
/// next run.
#[inline]
pub(crate) fn remark_scheduled_dirty(&mut self, nodes: &[NodeId]) {
for &node in nodes {
let Ok(index) = usize::try_from(node.as_u64()) else {
continue;
};
if index >= self.nodes.len() {
continue;
}
for &out_id in self.nodes[index].output_ids.iter() {
self.dirty.mark_dirty(out_id);
}
}
}

/// Internal dispatch hook: returns a spent scheduling buffer to the scratch workspace.
///
/// Dispatch takes the schedule out of the plan to execute it; handing the (cleared) buffer
/// back here on every exit path lets the next planning pass reuse its capacity.
#[inline]
pub(crate) fn reclaim_schedule_buffer(&mut self, mut buf: Vec<NodeId>) {
buf.clear();
self.scratch.to_run = buf;
}

fn execute_kind(
node: NodeId,
kind: &mut NodeKind,
Expand Down Expand Up @@ -1173,6 +1206,46 @@ mod tests {
}
}

/// A no-input program that traps at runtime (divide-by-zero).
fn trap_program() -> (Arc<VerifiedProgram>, FuncId) {
let mut pb = ProgramBuilder::new();
let mut a = Asm::new();
a.const_i64(1, 1);
a.const_i64(2, 0);
a.i64_div(3, 1, 2);
a.ret(0, &[3]);
let f = pb
.push_function_checked(
a,
FunctionSig {
arg_types: vec![],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(f, 0, "value").unwrap();
(Arc::new(pb.build_verified().unwrap()), f)
}

/// A no-input program returning the constant `v` as output "value".
fn const_program(v: i64) -> (Arc<VerifiedProgram>, FuncId) {
let mut pb = ProgramBuilder::new();
let mut a = Asm::new();
a.const_i64(1, v);
a.ret(0, &[1]);
let f = pb
.push_function_checked(
a,
FunctionSig {
arg_types: vec![],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(f, 0, "value").unwrap();
(Arc::new(pb.build_verified().unwrap()), f)
}

#[test]
fn graph_error_display_includes_actionable_context() {
let bad_entry = GraphError::BadEntryFunc { func: FuncId(99) }.to_string();
Expand Down Expand Up @@ -1603,23 +1676,7 @@ mod tests {

#[test]
fn run_all_preserves_vm_trap_info() {
let mut pb = ProgramBuilder::new();
let mut a = Asm::new();
a.const_i64(1, 1);
a.const_i64(2, 0);
a.i64_div(3, 1, 2);
a.ret(0, &[3]);
let f = pb
.push_function_checked(
a,
FunctionSig {
arg_types: vec![],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(f, 0, "value").unwrap();
let prog = Arc::new(pb.build_verified().unwrap());
let (prog, f) = trap_program();

let mut g = ExecutionGraph::new(HostNoop, Limits::default());
let n = g.add_node(prog, f, vec![]).unwrap();
Expand All @@ -1632,6 +1689,120 @@ mod tests {
assert_eq!(trap.trap, Trap::DivByZero);
}

#[test]
fn run_all_trap_keeps_independent_node_recoverable() {
// node0 traps; node1 is an independent constant scheduled after it. A trap mid-pass must
// not silently discard node1's pending dirty work.
let (trap_prog, trap_f) = trap_program();
let (const_prog, const_f) = const_program(42);

let mut g = ExecutionGraph::new(HostNoop, Limits::default());
let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap();
let node1 = g.add_node(const_prog, const_f, vec![]).unwrap();

// node0 is scheduled first and traps; fail-fast leaves node1 unrun.
assert!(matches!(g.run_all(), Err(GraphError::Trap { .. })));
assert_eq!(g.node_run_count(node0), Some(0));
assert_eq!(
g.node_run_count(node1),
Some(0),
"precondition: node1 must not have run in the trapping pass"
);

// node1's dirty state survived: a targeted re-run executes it and produces its value.
let summary = g.run_node(node1).unwrap();
assert_eq!(summary.executed_nodes, 1);
assert_eq!(
g.node_outputs(node1).and_then(|o| o.get("value")),
Some(&Value::I64(42))
);
}

#[test]
fn run_all_trap_does_not_remark_already_executed_nodes() {
// Scheduled order is [node_a, node0, node_c]: node_a runs, node0 traps, node_c is unrun.
// The fix must re-mark only the failed node and the unrun tail, never the node that
// already executed successfully.
let (a_prog, a_f) = const_program(7);
let (trap_prog, trap_f) = trap_program();
let (c_prog, c_f) = const_program(99);

let mut g = ExecutionGraph::new(HostNoop, Limits::default());
let node_a = g.add_node(a_prog, a_f, vec![]).unwrap();
let _node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap();
let node_c = g.add_node(c_prog, c_f, vec![]).unwrap();

assert!(matches!(g.run_all(), Err(GraphError::Trap { .. })));
assert_eq!(
g.node_run_count(node_a),
Some(1),
"precondition: node_a must have executed before the trap"
);

// node_a already ran and must not have been re-marked, so a targeted re-run is a no-op.
assert_eq!(g.run_node(node_a).unwrap().executed_nodes, 0);
assert_eq!(g.node_run_count(node_a), Some(1));

// node_c was unrun and re-marked, so it remains recoverable.
assert_eq!(g.run_node(node_c).unwrap().executed_nodes, 1);
assert_eq!(
g.node_outputs(node_c).and_then(|o| o.get("value")),
Some(&Value::I64(99))
);
}

#[test]
fn run_node_trap_keeps_closure_sibling_recoverable() {
// target depends on node0 (traps) and node_sib (independent constant). Running target's
// closure traps on node0; node_sib must remain recoverable rather than being dropped.
fn passthrough2_program() -> (Arc<VerifiedProgram>, FuncId) {
let mut pb = ProgramBuilder::new();
let mut a = Asm::new();
a.i64_add(3, 1, 2);
a.ret(0, &[3]);
let f = pb
.push_function_checked(
a,
FunctionSig {
arg_types: vec![ValueType::I64, ValueType::I64],
ret_types: vec![ValueType::I64],
},
)
.unwrap();
pb.set_function_output_name(f, 0, "value").unwrap();
(Arc::new(pb.build_verified().unwrap()), f)
}

let (trap_prog, trap_f) = trap_program();
let (sib_prog, sib_f) = const_program(42);
let (tgt_prog, tgt_f) = passthrough2_program();

let mut g = ExecutionGraph::new(HostNoop, Limits::default());
let node0 = g.add_node(trap_prog, trap_f, vec![]).unwrap();
let node_sib = g.add_node(sib_prog, sib_f, vec![]).unwrap();
let target = g
.add_node(tgt_prog, tgt_f, vec!["x".into(), "y".into()])
.unwrap();
g.connect(node0, "value", target, "x").unwrap();
g.connect(node_sib, "value", target, "y").unwrap();

// node0 is scheduled before node_sib inside target's closure and traps.
assert!(matches!(g.run_node(target), Err(GraphError::Trap { .. })));
assert_eq!(
g.node_run_count(node_sib),
Some(0),
"precondition: node_sib must not have run in the trapping pass"
);

// node_sib's dirty state survived the closure trap and re-runs cleanly.
let summary = g.run_node(node_sib).unwrap();
assert_eq!(summary.executed_nodes, 1);
assert_eq!(
g.node_outputs(node_sib).and_then(|o| o.get("value")),
Some(&Value::I64(42))
);
}

#[test]
fn graph_builder_errors_on_bad_entry_func() {
let mut pb = ProgramBuilder::new();
Expand Down
Loading