Skip to content

feat(aqe): support executor failure in AdaptiveExecutionGraph#1601

Draft
jja725 wants to merge 14 commits intoapache:mainfrom
jja725:aqe-executor-failure
Draft

feat(aqe): support executor failure in AdaptiveExecutionGraph#1601
jja725 wants to merge 14 commits intoapache:mainfrom
jja725:aqe-executor-failure

Conversation

@jja725
Copy link
Copy Markdown

@jja725 jja725 commented Apr 27, 2026

Summary

Implements the unchecked "support executor failure" task from the AQE epic #1359. Brings AdaptiveExecutionGraph to parity with StaticExecutionGraph's executor-loss recovery so re-running stages and rolled-back stages work end-to-end under AQE.

Problem

AdaptiveExecutionGraph::reset_stages_on_lost_executor was structurally a copy of the static-graph version, but it was a no-op in practice for AQE because:

  1. AQE's create_resolved_stage initialises stage.inputs to an empty HashMap (intentional — partition locations live in the planner's plan tree under ExchangeExec.shuffle_partitions, not in stage.inputs). The static-graph rollback walk thus found nothing.
  2. The AdaptivePlanner's side state (runnable_stage_cache, runnable_stage_output) was never told about lost executors, so a re-running successful stage failed with Can't find active stage to update stage outputs.
  3. ExchangeExec / AdaptiveDatafusionExec in the live plan tree retained shuffle_partitions = Some(...) even after their owning stage rolled back, so find_runnable_exchanges would skip them.

Changes

  • aqe/execution_plan.rs — new pub(crate) fn reset_locations_on_lost_executor(&self, executor_id) -> Option<usize> on both ExchangeExec and AdaptiveDatafusionExec. Clears shuffle_partitions back to None if any location matches the lost executor; returns the affected stage_id.
  • aqe/planner.rs — new pub(super) fn reset_on_lost_executor(&mut self, executor_id) -> Result<HashSet<usize>> on AdaptivePlanner. Walks the live plan tree, calls the per-exec reset, restores runnable_stage_cache / runnable_stage_output for affected stages, re-runs replan_stages(), and returns the set of affected stage_ids.
  • aqe/mod.rsreset_stages_internal now uses the planner's affected set to:
    1. Reset task_infos and transition matching Successful stages back to Running.
    2. Drop any Resolved/Running stages whose embedded plan reads from an affected stage (their ShuffleReaderExec entries hold stale partition locations). The planner regenerates them via actionable_stages once upstream reruns complete.
  • aqe/mod.rsupdate_task_status now warns (instead of erroring) when a task status arrives for a stage that's no longer in self.stages. This is expected after the dependent-stage drop above.
  • Tests — port four executor-failure tests from the static-graph suite into a new aqe/test/executor_failure.rs:
    • test_reset_completed_stage_executor_lost
    • test_reset_resolved_stage_executor_lost
    • test_task_update_after_reset_stage (incl. idempotency check)
    • test_long_delayed_failed_task_after_executor_lost
  • Helpers — new test_aqe_aggregation_plan / test_aqe_join_plan helpers in aqe/test/mod.rs that build an AdaptiveExecutionGraph from a SQL-shaped plan, mirroring the static-graph builders.
  • Doc cleanup — drop the stale /// - it does not cover executor failure line from the AdaptiveExecutionGraph docstring.

Test Plan

  • cargo test -p ballista-scheduler — 76 passed, 0 failed, 1 ignored (pre-existing).
  • cargo clippy -p ballista-scheduler --tests — clean.
  • cargo fmt -p ballista-scheduler -- --check — clean.
  • All four ported tests pass; the static-graph executor-failure tests still pass; existing AQE tests still pass.

Out of Scope

  • AQE-specific re-optimization triggered by failure (e.g., switching join strategy after losing data).
  • Partial per-partition re-execution beyond what static graph already does.
  • Dynamic shuffle coalescing (separate task on the epic).

Refs: #1359

Design and implementation plan committed under docs/superpowers/specs/ and docs/superpowers/plans/.

jja725 and others added 14 commits April 27, 2026 16:05
Spec for issue apache#1359 task "support executor failure" — covers the
AdaptivePlanner state-sync gap that prevents re-running stages from
accepting update_exchange_locations after rollback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Self-review fix: code blocks now match the Migration section
(pub(crate) for exec wrappers, pub(super) for the planner method).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task-by-task TDD plan covering:
- per-exec reset_locations_on_lost_executor (ExchangeExec, AdaptiveDatafusionExec)
- AdaptivePlanner::reset_on_lost_executor + collect_affected_stages
- wire-in at AdaptiveExecutionGraph::reset_stages_internal
- four ported executor-failure tests
- final clippy/fmt verification

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Clears resolved shuffle_partitions when any location references the
lost executor; returns the stage_id so the planner can restore cache
entries downstream.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors ExchangeExec; the final-stage wrapper can also carry resolved
shuffle metadata when the root stage produces shuffled output.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Walks the live plan tree, clears resolved shuffle metadata on
ExchangeExec / AdaptiveDatafusionExec nodes that reference the lost
executor, restores runnable_stage_cache / runnable_stage_output
entries for affected stages, and re-runs replan_stages.

Without this the AdaptiveExecutionGraph rolls back stages but the
planner's plan tree still treats them as resolved, and re-running
stages can't accept update_exchange_locations.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
reset_stages_internal now syncs planner state after rolling back
graph-level stages on executor loss; also drop the stale
"does not cover executor failure" doc line.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds test_aqe_aggregation_plan / test_aqe_join_plan that build
AdaptiveExecutionGraph from a SQL-shaped plan, mirroring the static
graph helpers. Registers an empty executor_failure module to be
filled in by subsequent tasks.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Verifies that when an executor is lost mid-stage, the graph rolls
back affected stages, the planner restores its cache, and the
job completes successfully on a surviving executor.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Original spec assumed AQE's reset_stages_internal could rely on the
existing stage.inputs walk to detect lost-executor data. That walk is
a no-op for AQE (create_resolved_stage initialises inputs to an empty
HashMap). Update the contract:

- AdaptivePlanner::reset_on_lost_executor now returns the set of
  stage_ids whose ExchangeExec / AdaptiveDatafusionExec outputs were
  on the lost executor.
- AdaptiveExecutionGraph::reset_stages_internal uses that set to:
  1. Reset task_infos and transition matching Successful stages back
     to Running so they re-execute.
  2. Drop any Resolved/Running stages whose embedded plan reads from
     an affected stage (their ShuffleReaderExec entries hold stale
     locations). The planner regenerates them via actionable_stages
     once the upstream reruns complete.

Add the dependency walker plan_reads_from_any used by step 2.

Also adds test_reset_resolved_stage_executor_lost which covers the
case where both leaf stages have completed before the executor is
lost.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Verifies that:
1. A late task status arriving after reset_stages_on_lost_executor
   doesn't corrupt graph state.
2. A second reset call for the same executor is a no-op (idempotent).
3. The job still completes after the reset/late-status churn.

Also softens update_task_status to warn (not error) when a task
status arrives for a stage that's been dropped during recovery —
this is now expected when the dependent-stage drop in
reset_stages_internal removes a Running stage that still has
in-flight tasks.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Verifies that a failed task status arriving long after the executor
was declared lost does not corrupt graph state — the rerun continues
on the surviving executor.

Refs: docs/superpowers/specs/2026-04-27-aqe-executor-failure-design.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Collapse a nested if (clippy::collapsible_if) and apply rustfmt
across the new and modified files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the documentation Improvements or additions to documentation label Apr 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant