Dispatcher abstraction: Executor protocol + generic dispatch loop#76
Conversation
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Fresh-context adversarial self-review of the dispatcher refactor. I checked the byte-identical surface hard: per-cell Lambda event payload, run-summary keys/values, counting rules (incl. the "No granules found" / "No data after filtering" no-data sentinels and the local raised-vs-meta-error asymmetry), the probe→client→setup→fan-out→finalize ordering, max_pool_connections sizing, progress-log cadence, FD-exhaustion re-raise, and pool-shutdown-in-finally. All faithful — except one real finding, now folded in this push:
estimated_cost_usdwas no longer byte-identical — the loop accumulated per-cellcost_usd, which differs from origin/main's single multiply over summed time in the last FP ULP (~83% of heterogeneous-duration runs). Fixed by computing cost once off the summedcompute_time_sinrunner._run_lambda; added a mixed-duration test that pins it. See the two inline comments + the test comment.
Everything else verified correct; no further diff-scoped changes. The 3 unrelated pre-existing test failures (test_integration.py::{test_full_integration,test_multiple_parent_cells}, test_processing.py::TestWriteDataframeToZarr::test_write_dataframe_to_zarr) are flagged in the PR body's "Questions for review" — they fail on a clean origin/main checkout, so left untouched.
Generated by Claude Code
| # the per-cell CellCost.cost_usd is for the report's structured breakdown. | ||
| total_lambda_time = report.cost.compute_time_s | ||
| memory_gb = LAMBDA_MEMORY_GB | ||
| gb_seconds = total_lambda_time * memory_gb |
There was a problem hiding this comment.
🤖 from Claude (review)
Folded (was the one blocking finding). Earlier in this branch the runner read estimated_cost straight off report.cost.cost_usd, i.e. the loop's sum of per-cell (duration_i · 2.0) · price. That is not bit-equal to origin/main's single multiply over the summed time, (Σ duration · 2.0) · price — IEEE-754 makes the two orders differ in the last ULP for ~83% of heterogeneous-duration runs (verified: 0.00035466843999999996 vs 0.00035466844 for durations [0.1, 0.2, 0.3, 12.7]). Since estimated_cost_usd is a run-summary value and the hard requirement is byte-identical summaries, this line now computes gb_seconds/estimated_cost once off report.cost.compute_time_s, restoring the exact pre-refactor arithmetic order. gb_seconds was always safe (×2.0 is exact in binary FP). The per-cell CellCost.cost_usd stays for the report's structured breakdown only.
Generated by Claude Code
| cost = executor.measure_cost(result) | ||
| report.cost.compute_time_s += cost.compute_time_s | ||
| report.cost.gb_seconds += cost.gb_seconds | ||
| report.cost.cost_usd += cost.cost_usd |
There was a problem hiding this comment.
🤖 from Claude (review)
Note for #20/#12 backends: the loop still accumulates cost.cost_usd per cell here, which is fine for the report's structured per-cell breakdown, but consumers that need a byte-stable aggregate dollar figure should derive it once from the summed compute_time_s (as runner._run_lambda now does), not from this running cost_usd sum — the two differ in the last FP ULP. gb_seconds accumulation is exact and safe to read directly.
Generated by Claude Code
| assert summary["price_per_gb_sec"] == 0.0000133334 | ||
| assert summary["estimated_cost_usd"] == 16.0 * 0.0000133334 | ||
|
|
||
| def test_lambda_cost_byte_identical_with_mixed_durations(self, monkeypatch, atl06_config): |
There was a problem hiding this comment.
🤖 from Claude (review)
Folded (test-gap finding). The other cost test (test_lambda_summary_keys_and_cost) uses uniform per-cell durations (all 2.0), the one case where per-cell-accumulation and multiply-once happen to agree — so it could not have caught the ULP divergence above. This test uses heterogeneous durations [0.1, 0.2, 0.3, 12.7] (with max_workers=1 so completion order is deterministic) and asserts estimated_cost_usd == (Σ duration · 2.0) · price. It fails against the per-cell-accumulation version and passes against the folded fix, pinning the byte-identical requirement.
Generated by Claude Code
Closes #63.
What this does
Extracts the fan-out → retry → measured-cost loop that lived as two bespoke functions in
runner.py(_run_local/_run_lambda) into a clean seam in a newsrc/zagg/dispatch.py, so temporal (#12) and future cluster backends (#20) inherit local + Lambda execution behind one interface.Implements the (B)+(C) design locked with @espg on the issue thread (comment, recap):
Executorprotocol (@runtime_checkable):preflight(n_cells) -> PreflightReport,submit(payload) -> Future,measure_cost(result) -> CellCost,finalize() -> RunReport,shutdown().RetryPolicy(max_attempts, backoff, classify)as a separate strategy object, with defaultsLAMBDA_RETRY/LOCAL_RETRY. The only per-backend variation isclassify— errno-24 / EMFILE is not retryable (run-fatal, re-raised), boto3 throttling is.dispatch()loop returning a structuredRunReport. Per-result counting differs between backends, so it lives in a caller-suppliedaccumulatecallback rather than being baked in.LocalExecutor(thread pool, zero metered cost) +LambdaExecutor(boto3 fan-out; pricing model inmeasure_cost).Approach / what stayed put (per the locked answers)
agg(backend="local"|"lambda")signature is verbatim. The executor is an internal seam selected frombackend, not a new public param.concurrency.pystays a helper module, now called from insideLambdaExecutor.preflight()(via the runner's injected_preflight) rather than inline.runnerkeeps cost presentation (it formatsgb_seconds/estimated_cost_usdoff theRunReport);dispatch()only returns structured data.preflightseam._invoke_lambda_cell/_invoke_lambda_setup/_invoke_lambda_finalize/compute_available_workers/ThreadPoolExecutor) are injected into the executors off therunnermodule namespace, sodispatch.pyneeds no boto3 import and the existing tests that monkeypatch those names keep binding the exact objects in use.Phases
_run_lambda→LambdaExecutor+ genericdispatch()+RetryPolicy._run_local→LocalExecutoron the same loop (validates the protocol on the trivial case).LambdaExecutor.preflight();dispatch()returnsRunReport, runner keeps cost presentation.How it was tested (byte-identical evidence)
The spatial production path must stay byte-identical — Zarr store, run-summary dict keys, and per-cell Lambda event payload all unchanged. Pinned by:
tests/test_runner.py::TestInvokeLambdaCellEvent— per-cell event payload bytes (unchanged on this branch).tests/test_runner.py::TestSummaryKeysByteIdentical— pins the exact summary-dict key set and the data/error counters for both backends through the dispatch refactor, plus the Lambda cost math (4 cells × 2 s × 2 GB = 16 GB-s,× $0.0000133334).tests/test_runner_concurrency.py— the probe-clamp / FD-exhaustion wiring still passes unchanged (pool sized to the clamped worker count; errno-24 surfaces withulimitguidance).tests/test_dispatch.py—RetryPolicydefaults + classifiers (throttling retryable, EMFILE not),Executorprotocol conformance (isinstance(..., Executor)for both),LambdaExecutor.measure_costpricing,LocalExecutorzero-cost, and the generic loop's accumulate/cost/on_submit_errorbehavior.Local green:
ruff check --select=E,F,W,I --ignore=E501on touched files — clean.ruff format --checkon the new files (dispatch.py,test_dispatch.py) — clean. (runner.py/test_runner.pyare not ruff-format-clean onmain, and CI'slint.ymlonly runsruff check— so I deliberately did not whole-file-reformat them, to avoid ~200 lines of unrelated churn; only the logical edits are present.)mypy(pre-commit) — no errors indispatch.py/runner.py/ the new tests.pytest -q— 536 passed, 1 skipped.Questions for review
tests/test_integration.py::{test_full_integration,test_multiple_parent_cells}andtests/test_processing.py::TestWriteDataframeToZarr::test_write_dataframe_to_zarrfail on a cleanorigin/maincheckout (verified by stashing my diff) — they're unrelated to this change, so I left them per the "don't fix unrelated CI failures, flag them" rule. Flagging here.runner.pyleft un-ruff-format'd (see above) to keep the diff reviewable. Say the word if you'd rather take the whole-file reformat in this PR.RetryPolicyis currently advisory — the in-process executors apply retry insidesubmit(Lambda's existing_invoke_lambda_cellretry; local runs once), matching the pre-refactor behavior so the spatial path stays byte-identical. The policy object is threaded throughdispatch()so a future loop-level retry (and the Backend support (i.e., local processing support) #20 cluster backends) can consult one policy without a signature change. OK to land the seam advisory now, or do you want the loop to drive retry centrally in this PR?dispatch.py→ this protocol shape), as agreed — no action here.🤖 Generated with Claude Code
https://claude.ai/code/session_01EN7X53ZAyaCca9rjwv9qZw
Generated by Claude Code