Skip to content

Stream-and-free chunk writes in the multi-chunk worker#124

Draft
espg wants to merge 5 commits into
mainfrom
claude/91-stream-and-free
Draft

Stream-and-free chunk writes in the multi-chunk worker#124
espg wants to merge 5 commits into
mainfrom
claude/91-stream-and-free

Conversation

@espg

@espg espg commented Jun 29, 2026

Copy link
Copy Markdown
Member

Closes #91

What this does

The K>1 multi-chunk worker (process_shard, src/zagg/processing/worker.py) accumulated all K chunk outputs (chunk_results) in memory before any were written. This adds a per-chunk write_chunk callback seam so each chunk is computed → written → freed, capping the output-side footprint at ~1 chunk instead of K.

It is a no-op at K=1 and keeps on-disk output byte-identical (K=1 and K>1). It is complementary to #66 (the read-phase OOM driver), not a substitute — this only helps the K>1 output side.

Approach

  • process_shard(..., write_chunk: Callable | None = None) (phase 1). When provided, each chunk's (block_index, carrier, ragged) is handed to write_chunk(...) the moment it is built and its locals are dropped (del carrier, ragged), instead of appended to chunk_results. When None, the existing chunk_results / ragged_out behavior is unchanged. Passing both chunk_results + write_chunk raises, and (review fold) so does write_chunk + ragged_out (the chunk's ragged goes to the callback, so a ragged_out sink would be silently empty). A K>1 grid still requires one of the two sinks. The streamed return is an empty df_out (carriers already written, nothing stashed), same as the accumulated K>1 path.

    if streaming:
        write_chunk(block_index, carrier, ragged)
        del carrier, ragged
    elif chunk_results is not None:
        chunk_results.append((block_index, carrier, ragged))
    else:  # K==1 stash for the 2-tuple / ragged_out return
        ...
  • Runner _process_and_write (phase 2): the non-sharded path passes the per-chunk write body as write_chunk and no chunk_results; the sharded path (Evaluate zarr ShardingCodec for HEALPix output: decouple large write/dispatch shards from 64×64 read chunks #108) still accumulates via chunk_results + write_shard_to_zarr because it bundles all K inner chunks into one ShardingCodec shard object. The K==1-vs-K>1 ragged-key choice is now derived from grid.chunks_per_shard (fixed by the grid), so streaming needs no materialized count.

  • deployment/aws/lambda_handler.py (phase 3, authorized by the issue naming the file, as in Vector/ragged chunk companions + multi-chunk-per-worker (Closes #82, Refs #30) #84 phase 7): same split. The store is opened + template-validated once on the first chunk write (lazy, so a no-data shard never touches the store, as before). A missing template or a failed write is recorded (not raised) and folded into metadata after the stream so the 500 still carries the worker metadata. After any recorded error, subsequent streamed chunks short-circuit (partial write + 500 + skip rest). The write-phase timing (Possible order driven mismatch? #100) is accumulated across the streamed writes and attached only on a clean write.

Phases

  • Phase 1write_chunk seam in process_shard + free-after-write; chunk_results path preserved when callback is None; both-sinks + write_chunk/ragged_out guards.
  • Phase 2 — rewire runner _process_and_write (non-sharded streams; sharded still accumulates).
  • Phase 3 — rewire lambda_handler.py (lazy store/template, recorded errors with short-circuit, write-timing).
  • Review fold — runner-level streaming tests; lambda later-chunk-failure test; write_chunk+ragged_out guard + test; dropped a redundant = None rebind before del.

How tested

tests/test_processing.py::TestStreamAndFreeChunkWrites:

  • byte-identical: per block index, the carriers handed to write_chunk equal the carriers the accumulated chunk_results path produced (K=4).
  • fires once per chunk + frees as it goes: callback invoked exactly K times at K distinct blocks; a "max live carriers" probe asserts ≤1 chunk is ever held.
  • both-sinks raises; write_chunk+ragged_out raises; K==1 streaming is a true no-op (streamed carrier equals the default 2-tuple return; df_out empty).

tests/test_runner.py::TestProcessAndWriteStreaming (review fold): drives _process_and_write with a fake process_shard that streams 1 and K>1 chunks through the callback and asserts the dense chunk_idx sequence + ragged-key sequence (shard_key at K=1, _block_index_key at K>1), and that the handler passes write_chunk with no chunk_results.

tests/test_lambda_handler.py: the existing K>1 / K=1 write-loop and profile tests were updated to drive process_shard through the streaming contract; added test_non_sharded_streams_via_write_chunk_not_chunk_results (handler passes a callback, no sink) and (review fold) test_streaming_later_chunk_write_failure_partial_writes_and_500 (chunk 2 of 3 raises → earlier write stands, remaining chunk skipped, 500).

Local status: ruff check --select=E,F,W,I --ignore=E501 src tests clean; ruff format --check clean on all touched files; full pytest green (841 passed, 7 skipped; tests/test_shardmap.py only fails collection on the missing stac_geoparquet catalog extra, pre-existing and unrelated — CI installs it and test (3.12)/test (3.13) are green here).

Questions for review

  • Lambda template-missing 500 now falls through to the structured processing_complete log instead of early-returning. The status code and body (now carrying worker phase_timings) are preserved; only the extra structured log line is new. Acceptable, or keep the early-return-without-log exactly?
  • write phase-timing semantics shift (Possible order driven mismatch? #100): the buffered path measured one wall-clock bracket around the whole write loop; the streaming path sums per-chunk write deltas, which excludes the compute process_shard interleaves between chunk writes. This is arguably more accurate ("write-only") and the absent-on-failure/no-data gate is preserved, but a streamed K>1 write value reads smaller than the old buffered one — not comparable across the pre/post-#91 boundary if any rollup/dashboard does that. Acceptable?
  • grid.chunks_per_shard-derived single_chunk replaces the old len(chunk_results) == 1 so the streaming path needs no count. Equivalent on every reachable case (K is fixed by the grid); flagging in case a grid backend ever sets chunks_per_shard inconsistently with the materialized chunk count.
  • Benchmarks are now in place (per the issue thread) — happy to fold in any regression-gate wiring if a specific benchmark hook is expected here.

@espg espg added the implement label Jun 29, 2026

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Adversarial review of issue #91 (stream-and-free chunk writes) across all 3 phases. The core mechanism is sound: the write_chunk seam is a true no-op at K=1 (lone chunk streamed exactly as K>1), the both-sinks guard and the K>1-needs-a-sink guard are correct, the sharded (#108) path is untouched and still bundles via chunk_results, output byte-identity is preserved (ragged keying + status codes match the buffered path), and no new dependency was added (only stdlib typing.Callable). Touched modules stay under the ~1000-line limit (worker.py 487, lambda_handler.py 400; runner.py 1103 but pre-existing and net-shrunk by this PR).

Findings inline. Summary of the actionable ones: (1) the runner's new _write_chunk body has no direct test — its ragged-keying/write-order is only covered transitively; (2) no test exercises a streaming write that raises on a later chunk after an earlier one wrote. The rest are nits (a redundant = None before del, an unguarded write_chunk+ragged_out combo, and a write-timing semantic shift worth a PR-body note). No blocking defects found.


Generated by Claude Code

# harmless — gate on a populated ``phase_timings`` and no error to match the
# old "write absent on failure / no-data" contract.
if profile and not metadata.get("error") and "phase_timings" in metadata and store_box:
metadata["phase_timings"]["write"] = _write_elapsed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Semantic shift in the write phase-timing value (issue #100), worth flagging even though it's defensible. The buffered path measured one wall-clock bracket around the whole write loop (time.time() - _write_t0), so write included any inter-chunk overhead. The streaming version sums per-chunk deltas (_write_elapsed += time.time() - _t0 inside _write_chunk), which now excludes the compute time that process_shard spends between chunk writes — the two are interleaved. So a streamed K>1 write value will read smaller than the old buffered one for the same shard. This is arguably more accurate ("write-only"), and the gate (not error and "phase_timings" in metadata and store_box) correctly mirrors the old "absent on failure / no-data" contract — but if any downstream rollup or dashboard compares write across the pre/post-#91 boundary, the numbers aren't comparable. Confirm that's acceptable, or note it in the PR body.


Generated by Claude Code

Comment thread src/zagg/runner.py
# Persist this chunk's ragged (CSR) fields — one CSR group per field per
# chunk (issue #48). No-ops when ``ragged`` is empty.
ragged_key = int(shard_key) if single_chunk else _block_index_key(block_index, grid)
write_ragged_to_zarr(ragged, zarr_store, grid=grid, shard_key=ragged_key)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Test gap (diff-scoped): the runner's new _write_chunk body has no direct coverage. The two _process_and_write tests in tests/test_runner.py (test_process_and_write_forwards_handoff, test_default_handoff_is_pandas) stub process_shard to return without ever invoking the write_chunk callback, and test_local_summary_* stubs out _process_and_write entirely. So nothing in the runner suite actually drives this callback — its single_chunk ragged-keying (int(shard_key) at K=1 vs _block_index_key(block_index, grid) at K>1) and write order go untested at the runner level.

The worker-level TestStreamAndFreeChunkWrites and the lambda's test_non_sharded_streams_via_write_chunk_not_chunk_results mitigate this (the seam and an equivalent callback are exercised), but issue #91 phase 2's acceptance ("K=1 byte-identical; multi-chunk output identical") is specifically about this runner path. Consider a small runner test that invokes _process_and_write with a fake process_shard that streams 1 and K>1 chunks through the callback and asserts the dense chunk_idx + ragged-key sequence — symmetric to the lambda test you added.


Generated by Claude Code

# peak output-side memory holds ~1 chunk, not all K. Nothing is stashed.
write_chunk(block_index, carrier, ragged)
carrier = ragged = None
del carrier, ragged

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Nit (terseness): carrier = ragged = None followed by del carrier, ragged is redundant — the rebind to None does nothing useful before the del removes the names. del carrier, ragged alone drops the local refs and makes the chunk collectible (the callback has already been handed the objects on the line above). The = None line can go. (Functionally correct as-is; test_callback_fires_once_per_chunk_and_chunk_results_untouched already asserts max_live == 1, so the free-after-write intent holds either way.)


Generated by Claude Code

if chunks_per_shard > 1 and chunk_results is None:
if chunk_results is not None and write_chunk is not None:
raise ValueError(
"process_shard takes either chunk_results (accumulate) or write_chunk "

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Unguarded ambiguous combination (defensive nit, not a defect for current callers). The new guard rejects chunk_results + write_chunk together, but write_chunk + ragged_out is silently allowed: on the streaming path the chunk's ragged is handed straight to write_chunk and ragged_out is never populated, so a caller passing both would get a silently-empty ragged_out. Neither the runner nor the lambda does this, so it's harmless today — but it's the same class of ambiguity you chose to raise on for chunk_results. Either extend the guard to cover write_chunk + ragged_out, or leave a one-line note in the docstring that ragged_out is ignored when streaming.


Generated by Claude Code

# ShardingCodec shard object — one block selection per dense array (a per-
# inner-chunk loop would read-modify-write the same shard object). This path
# accumulated all K, so it opens + validates + writes here (same recording).
if sharded and chunk_results:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Correctness confirmed, recording it so the byte-identity claim is grounded. The status-code contract is preserved across the rewrite: template-missing and write-failure now go through write_error["msg"]metadata["error"], and the final return {"statusCode": 200 if not metadata.get("error") else 500} turns either into the same 500 the buffered path returned via its early-return / except. Covered by test_profile_missing_template_omits_write (500) and test_profile_failed_write_omits_write (500), both asserting write absent. The sharded branch here (if sharded and chunk_results:) keeps using _get_store(), so its template-missing/failed-write recording matches the streaming path. One thing not exercised: a streaming write that raises on a later chunk after an earlier chunk already wrote (the lambda tests stream chunks that don't raise, and the raise-path test uses a single chunk). The write_error-set → _get_store() returns None short-circuit makes subsequent chunks skip, but a test streaming e.g. 3 chunks where chunk 2 raises would lock in "partial write + 500 + remaining chunks skipped." Worth one test.


Generated by Claude Code

# streaming callback can pick the ragged key without a materialized list: at
# K==1 the chunk IS the shard (keyed by ``shard_key``); at K>1 each finer
# chunk is keyed by its own block index.
single_chunk = int(getattr(grid, "chunks_per_shard", 1)) == 1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

single_chunk derivation is equivalent to the old one — recording for the byte-identity audit. Old: single_chunk = len(chunk_results) == 1; new: int(getattr(grid, "chunks_per_shard", 1)) == 1. These agree on every reachable case (K=1 ⇒ exactly one chunk; K>1 ⇒ >1 chunks), and deriving from the grid is what lets the path stream without a materialized count. The ragged key (int(shard_key) at K=1 vs _block_index_key(block_index, grid) at K>1) therefore matches the buffered path exactly, and shard_key is sourced from event["shard_key"] as before. No defect — just confirming the K=1 no-op guarantee holds at the handler level.


Generated by Claude Code

@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

I expect that the sharding codec will quickly become our default for how we write out (and hence read in) our files. How does this PR and upstream issue relate / interact with that path? Does this bug not exist at all for the sharding codec case? Or does this fix a similar issue in that path as well? or are they completely separate items?

@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

note that we also need to resolve conflicts with main ahead of a merge

@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

On how this relates to the sharding-codec write path — short version: the K>1 accumulate-all-then-write pattern does still exist on the sharded path, but for the dense per-cell arrays it is inherent to the ShardingCodec contract, not a bug we can stream away the way the non-sharded path now does. The non-dense parts (companions + ragged) already stream there. There's a narrower, separable follow-up optimization worth filing.

Why the dense slab is inherent. A sharded array's outer chunk is the whole shard, and set_block_selection is shard-granular — one block selection writes one shard object. The K inner read-chunks all live inside that single object, so the only correct/efficient way to write them is to assemble one shard-wide slab and write it in a single call:

# src/zagg/processing/write.py:201-211 (write_shard_to_zarr docstring)
# set_block_selection is shard-granular (one block == one shard). Writing the K
# inner chunks as K separate block selections would each trigger a
# read-modify-write on the *same* shard object — racy and slow.
# src/zagg/processing/write.py:271-294 — one block selection per dense array == one shard write
for name, slab in slabs.items():
    ...
    array.set_block_selection(block_idx, slab)

The runner makes the dispatch explicit — the sharded branch keeps chunk_results precisely because it needs all K at once, while the non-sharded branch streams via the new write_chunk:

# src/zagg/runner.py:368-387
# a per-inner-chunk loop would read-modify-write the shard object.
# That path needs all K at once, so it accumulates via ``chunk_results``; the
# non-sharded path streams each chunk write-then-free via ``write_chunk`` (#91).
sharded = getattr(grid, "sharded", False)
chunk_results: list | None = [] if sharded else None
...
write_chunk=None if sharded else _write_chunk,
...
if sharded:
    write_shard_to_zarr(chunk_results, zarr_store, grid=grid, shard_key=int(shard_key))

So the answer to "does this bug exist for the sharding case too?": the read-phase OOM driver (#66) is identical regardless of sharding. The output-side accumulation this PR removes can't be removed for the sharded dense arrays — you cannot write a ShardingCodec shard one inner chunk at a time without read-modify-writing the shard object K times. That accumulation is the cost of the codec's single-writer / one-object-per-shard design (docs/sharding.md:30-34, 78-80). They are therefore mostly separate items, not the same fix.

What is not inherent (the separable follow-up). Two points:

  1. Companions (resolution: chunk) and ragged (CSR) are not sharded, and write_shard_to_zarr already writes them per inner chunk inside its loop — i.e. that part already streams:
# src/zagg/processing/write.py:263-269
if chunk_res_fields:
    _write_companion_columns(carrier, store, grid, block_index, chunk_res_fields)
write_ragged_to_zarr(ragged, store, grid=grid, shard_key=_block_index_key(block_index, grid))
  1. For the dense slab, the worker today holds all K full carriers in chunk_results until write_shard_to_zarr runs, but only each carrier's dense columns need to survive into the slab. A streaming variant could fold each chunk's dense columns into the running per-array slab (and write companions/ragged for that chunk) as each chunk is computed, then drop the carrier — capping the sharded output footprint at one carrier + the shard slab instead of K carriers + the slab. The slab itself is unavoidable (it's the shard), but it's sparse-by-fill and bounded by the shard size, not by K full carriers. That would require threading the write_chunk-style seam through to a slab accumulator rather than a plain list — a real but separate optimization, distinct from this PR's non-sharded stream-and-free.

Recommendation: worth a dedicated follow-up issue ("stream dense-column folding into the shard slab on the sharded path, free carriers as K is consumed"), explicitly scoped as reduce the per-carrier multiplier, not eliminate the shard slab (which the codec forbids). I haven't opened it — flagging it here for your call since opening an issue is a side-effecting action.


Generated by Claude Code

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Adversarial review of the origin/main merge into this branch (merge commit 8c86969). Verdict: the merge is clean — both sides' changes are preserved, the #120 intent holds, and nothing was dropped.

What I checked and found:

  1. No conflict markers. No <<<<<<<, 7-char =======, or >>>>>>> anywhere under src/, tests/, deployment/. (The === runs in src/zagg/registry.py are RST table rules, not git markers.)

  2. Main-only files came over verbatim. The delta the merge brought in (8307baf..8c86969) is byte-identical to main's #120 delta (8307baf..cbd4828) for every file the branch never touched: .github/scripts/{bench_metrics,plot_series,run_benchmark}.py, src/zagg/schema.py, tests/test_benchmark.py.

  3. tests/test_runner.py (the one conflicted file) — both sides kept.

    • _LAMBDA_KEYS (test_runner.py:603-626) is the union: the branch's 21 keys including worker_pct_timeout, plus main's new max_memory_mb (line 625). 22 total, none lost; the branch's one-key-per-line formatting was retained.
    • _fake_cell (test_runner.py:875-887) keeps main's if mem_it is not None: body["max_memory_mb"] = next(mem_it) injection (lines 879-880) and the branch's expanded multi-line return-dict literal. The memories=None param, the docstring line, and the whole TestWorkerMemory class also landed. The merge's only effect on this file beyond the branch tip is exactly those #120 additions — nothing else perturbed.
  4. deployment/aws/lambda_handler.py#120 and #91 coexist with correct ordering. _max_memory_mb() (def at line 75) is assigned at metadata["max_memory_mb"] = _max_memory_mb() (line 380), which is after the streamed per-chunk writes inside process_shard(...) (line 334, called with write_chunk=None if sharded else _write_chunk) and after the sharded write_shard_to_zarr(...) (line 354). So the capture still covers the full invocation — the #120 intent is intact under the #91 streaming rewire.

  5. src/zagg/runner.py — both survive. #91 streaming (single_chunk/chunks_per_shard/_write_chunk, lines 357-384) and #120 rollup (worker_memory/max_memory_mb, lines 775-780; summary key line 811; "Worker peak memory" log lines 831-834) are all present.

  6. Lint clean. ruff check --select=E,F,W,I --ignore=E501 src tests deployment → all passed; ruff format --check on the three touched files → already formatted.

No issues found in the merge resolution. (CI green on the merge commit is the remaining gate before ready-for-review, per the usual workflow.)


Generated by Claude Code

@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

Status: merge conflict with main resolved (merged origin/main in as 8c86969, no rebase/force-push; the lone tests/test_runner.py conflict kept both the stream-and-free changes and main's #120 max_memory_mb rollup — affected tests green, 252 passed) and the sharding-codec question answered above. Fresh-context self-review of the merge found it clean (nothing to fold). Applying waiting — awaiting @espg review.


Generated by Claude Code

@espg espg added waiting and removed waiting labels Jun 29, 2026
@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

resolve merge conflicts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stream-and-free chunk writes in the multi-chunk worker (don't accumulate all K)

2 participants