Skip to content

arro3-core carrier for the Lambda worker; drop pyarrow + arrow-kernel (#130)#131

Merged
espg merged 13 commits into
mainfrom
claude/130-pyarrow-layer-bench
Jun 30, 2026
Merged

arro3-core carrier for the Lambda worker; drop pyarrow + arrow-kernel (#130)#131
espg merged 13 commits into
mainfrom
claude/130-pyarrow-layer-bench

Conversation

@espg

@espg espg commented Jun 29, 2026

Copy link
Copy Markdown
Member

Closes #130

What this does

Migrates the deployed Lambda worker's Arrow carrier from pyarrow to arro3-core (path C, per @espg's Do (C) decision), and makes the arro3 / arrow carrier the default for production and the benchmark matrix.

pyarrow 24.0.0's bindings hard-link a ~100 MB unstrippable C++ core that can't fit the 250 MB combined Lambda gate while remaining importable; arro3-core is ~7 MB, zero required runtime deps, importable inside the gate. The deployed worker + layer are now pyarrow-free — pyarrow survives only in the off-Lambda catalog extra (stac-geoparquet hard-requires it), which never runs on the worker.

The carriers

  • Vector WRITE carrier → arro3-core — verified live (gain_bias → 200,676 obs on process-shard-test). On-disk numpy blocks are byte-for-byte identical to the prior pyarrow output (FixedSizeList via fixed_size_list_array + list_flatten(col).to_numpy().reshape(n, width)).
  • Read handoff carrier (handoff="arrow") → arro3-core — diagnosed, fixed, and now the default. (Story below.)
  • arrow-kernel reducer DROPPED — arro3 has no hash-aggregate. Removed _kernel_*/KERNEL_RTOL, the worker branch, the scalar_kernel target, and the kernel tests.

The read-carrier failure: not a native crash — a memory-lifetime bug (fixed)

The arro3 read carrier first appeared to crash the Lambda worker (scalar_arrow), so it was dropped (96dfa09). Re-investigating with the actual CloudWatch logs showed it was not a native crash — it was memory exhaustion:

carrier (same dense o11 shard, 200,676 photons) result max mem
gain_bias (arro3 vector write) 1278 MB
scalar_pandas 1725 MB
scalar_arrow (arro3 read handoff, unfixed) ❌ timeout 2048 MB (cap)

REPORT … Duration: 720000 ms Max Memory Used: 2048 MB Status: timeout — the worker pegged the 2 GB cap and thrashed to the 720 s wall, surfacing to the orchestrator as ConnectionClosedError ("connection closed, no response").

Root cause (measured, not inferred). Array.from_numpy/from_pydict/from_batches are zero-copy; the single forced copy is combine_chunks().to_numpy(). _concat_and_group kept the chunked Arrow table (and the per-read tables in all_reads) alive through _group_columns, so the pooled data was held twice. Fix — free the Arrow buffers the instant the columns are numpy:

cols = {n: table.column(n).combine_chunks().to_numpy() for n in table.column_names}
del table, batches
all_reads.clear()        # unused after this call (worker.py)

Proof: peak RSS 1004 → 524 MB; under a 2 GB cap the unfixed run OOMs (exit 137) and the fixed run survives (exit 0) with byte-identical output; on the real NEON median shard handoff=arrow is byte-identical to pandas through process_shard; at matched scale fixed-arro3 (1019 MB) is lighter than pandas (1456 MB, which holds 3× vs arrow's 2×). Full diagnosis: #130 comment; fresh-context adversarial review (no blocking issues): #131 review.

Default switch → arro3 / arrow

  • Production: agg() and the runner dispatch layer default handoff="arrow". The wire protocol is unchanged ("absent event key = pandas"): arrow runs inject the key; an explicit handoff="pandas" omits it (byte-identical legacy path). process_shard's library default and the handler's absent-default stay "pandas" (legacy-safe — explicit pandas keeps working).
  • Benchmarks: all 8 committed matrix targets set "handoff": "arrow", so the retained series tracks the carrier production actually runs.
  • Follow-on (Make the worker carrier (handoff) a config-driven aggregation-YAML field #132): a cleaner, config-driven handoff (an aggregation-YAML field, so other sensors / nullable sources can choose pandas) — the read carrier requires dense, null-free columns, so a global default isn't right for every sensor. Not blocking this PR.

Phases

  • Phase 1 — vector carrier → arro3-core (processing/write.py); byte-identical output.
  • Phase 2 — drop arrow-kernel reducer (arro3 has no hash-aggregate).
  • Phase 3 — deps + layer: pyarrow core→catalog (out of lambda); arro3-core in core + lambda; build_layer.sh swap + strip-block removal.
  • Phase 4 — benchmark matrix (scalar_pandas/scalar_arrow A/B baseline) + pyarrow-free worker guard test.
  • Phase 5 — read carrier handoff="arrow": diagnosed (CloudWatch: OOM→timeout), fixed (free Arrow buffers before grouping), revived; opt-in ZAGG_PROFILE_RSS per-stage RSS trace added.
  • Phase 6arrow made the default carrier (production agg()/runner + the 8 benchmark targets); docstrings/comments + the two default-assertion tests updated to arrow.

How tested

  • Full suite green locally (incl. test_runner/test_lambda_handler/test_benchmark after the default flip; the two test_default_handoff_is_* tests updated to assert arrow); ruff + ruff format clean.
  • Carrier byte-identity: vector + scalar columns round-trip through arro3 identically to the prior pyarrow blocks; handoff=arrow output == pandas on the real median shard.
  • Memory fix A/B: unfixed read carrier OOM-kills under a 2 GB cap, fixed survives, identical checksum (see above).
  • pyarrow-free guard: the default + vector write carrier run in a subinterpreter that blocks every pyarrow import, asserting pyarrow never enters sys.modules.
  • Live Lambda (process-shard-test): gain_bias (arro3 write) ✅ 200,676 obs; scalar_pandas ✅ 200,676 obs. The fixed arro3 read carrier will be exercised by the next merge's arrow benchmark across all 8 targets.

Questions for review

  • Validate the fixed read carrier on the dense shard on real Lambda before fully trusting the default — the next merge runs the 8-target arrow matrix; watch the dense o11/o10 REPORT mem stays well under 2048 MB.
  • The read carrier requires dense, null-free columns (guard raises on nulls). Per-sensor pandas/arrow choice belongs in the config-driven follow-on (Make the worker carrier (handoff) a config-driven aggregation-YAML field #132).
  • arro3 pin is ==0.8.1 (lambda) / >=0.8.1 (core) — bump policy is yours.

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Adversarial review of Phase 1 only (commit 8bc813d, parent bcd98e4) — touches deployment/aws/build_layer.sh and pyproject.toml.

Verdict: looks correct, no blocking findings. Each acceptance criterion from #130 checks out:

  • pyarrow added + pinned, no drift. pyarrow==24.0.0 is added to the build_layer.sh install line (build_layer.sh:62) and to the lambda extra (pyproject.toml:49), at the same pin — the extra and the build agree, satisfying the "no drift" criterion. The pin is consistent in spirit with the existing numpy==2.2.6/pandas==2.2.3 pins.
  • Bulk strip fixed. pyarrow* is removed from the rm -rf bloat line, and the now-false "pyarrow stays stripped" comment is replaced with an accurate one describing the kept-but-component-stripped state.
  • Component-strip set is coherent. Removed: libarrow_flight*, libarrow_substrait*, libgandiva*, libarrow_dataset* + _flight*.so/_substrait*.so/_gandiva*.so/_dataset*.so. I checked the globs against pyarrow's actual library naming — none of the libarrow_* removal globs collide with the KEEP set (libarrow.so* is libarrow. not libarrow_; libarrow_acero*/libarrow_python* have distinct suffixes from _flight/_substrait/_dataset), and the _*.so cython globs don't catch _acero*.so/_compute*.so/_parquet*.so. KEEP items (core, python, parquet, acero+compute for the arrow-kernel reducer per #130 (2)) are all preserved.
  • 250 MB gate intact. The size check at build_layer.sh:98-105 is untouched. Per the PR body, actual fit is verified by the CI lambda-build Size Report, not locally — correct call given arm64 can't be built here.
  • Style/scope. The new rm -f block mirrors the existing strip-block style (multi-line \-continued, 2>/dev/null || true), terse, no dead code. No CI-workflow edits, no live-AWS calls.

Non-blocking notes (no action required for phase 1):

  1. _dataset*.so also strips _dataset_parquet*.so / _dataset_orc*.so (the datasets-API parquet/orc integration). This is intended and safe — catalog parquet I/O goes through pyarrow.parquet (_parquet*.so, kept), not the datasets layer. Flagging only so it's a conscious decision on record.
  2. As the PR body itself notes, the issue body attributes the crash to tdigest but the vector (pyarrow) path is actually gain_bias. Phase 1's fix is target-agnostic, so this doesn't affect the diff — just confirming the grounding correction is acknowledged.

Generated by Claude Code

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Adversarial fresh-context review of phase 2a only (commit 4e0d6ca, parent 8bc813d): threading handoff into the Lambda backend, default "pandas", byte-identical default event.

Verdict: correct on every acceptance criterion of #130 that this phase covers. The single most important property holds: in _invoke_lambda_cell the "handoff" event key is added only via if handoff and handoff != "pandas":, exactly mirroring the if profile: opt-in, so a default run's payload is byte-identical to the pre-handoff path. Confirmed:

  • Threading is complete and defaults are consistent at every hop: agg(handoff="pandas") -> _run_lambda(handoff="pandas") -> _invoke_lambda_cell(handoff="pandas") -> event["handoff"] (only if non-default) -> lambda_handler event.get("handoff", "pandas") -> process_shard(..., handoff=...). process_shard (worker.py:48) accepts handoff: str = "pandas" keyword-only, so an absent key consumes the default.
  • The handler/runner tests are meaningful, not tautological, and cover both directions: explicit "arrow"/"arrow-kernel" is forwarded into the event and reaches process_shard; default adds no key and the consumed value stays "pandas".
  • Docstring corrected — the misleading "Only honored by the "local" backend" note is gone and now accurately describes lambda forwarding + the byte-identical default.
  • No .github/workflows/ edits, no live-AWS calls, no broad # noqa/# type: ignore, terse and matches the surrounding profile pattern, modules well under ~1000 lines.

One substantive finding (non-blocking): the run_benchmark.py additions — all_target_names (run-all excludes provisional_targets) and _resolve_target (provisional fallback) — are new behavioral logic that ships untested at this commit, and the handoff thread into agg in run_target isn't exercised by dry-run (dry-run skips agg). The PR body's test_provisional_targets_excluded_from_merge_matrix and the provisional_targets manifest data land in phase 2b, not here. Per CLAUDE.md §4 that's the same PR, so it's a phasing observation rather than a blocker — just confirm 2b covers all three new functions. Inline notes detail this plus a minor dead KeyError branch in _resolve_target (unreachable from the main() CLI path, which pre-validates and raises SystemExit first).

Nothing here blocks advancing to 2b.


Generated by Claude Code

Comment thread src/zagg/runner.py Outdated
# Only add the key when profiling, so default runs stay byte-identical (#100).
if profile:
event["profile"] = True
# Only add the key for a non-default carrier, so default (pandas) runs stay

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

The byte-identical-default guard is correct and is the most important property of this phase: if handoff and handoff != "pandas" adds the "handoff" key only for a non-default carrier, exactly mirroring the if profile: opt-in two lines up. _captured_event(..., handoff="pandas") -> "handoff" not in event confirms it. No notes here, just confirming the load-bearing invariant holds.


Generated by Claude Code

if name in provisional:
return provisional[name]
known = list(targets) + list(provisional)
raise KeyError(f"unknown target '{name}'; have {known}")

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

This KeyError branch is effectively unreachable from the only in-tree caller. main() pre-validates every name against known = targets | provisional_targets and raises SystemExit (line 212) before run_target -> _resolve_target ever runs, so _resolve_target only sees names already known to be in one block or the other. The KeyError path can only fire via a direct API call to run_target/_resolve_target. That's fine as a defensive guard, but note it's dead for the CLI path and therefore untested — see the summary's coverage point.


Generated by Claude Code

return (base / rel).resolve()


def all_target_names(manifest: dict) -> list[str]:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

all_target_names (run-all excludes provisional_targets) and _resolve_target (run a provisional by explicit --target) are new behavioral logic, but this commit adds no test for either. The runner-side handoff plumbing (target.get("handoff", "pandas") -> agg(handoff=...)) is also untested here — and unlike _invoke_lambda_cell, it isn't exercised by the dry-run path (dry-run skips the agg call entirely, so the new handoff line in run_target runs but its value is discarded). The PR body says test_provisional_targets_excluded_from_merge_matrix pins the exclusion, but that test (and the provisional_targets manifest data it needs) lands in phase 2b, not here — at this commit tests/test_benchmark.py has no provisional/handoff reference. Per CLAUDE.md §4 it's the same PR, so this is acceptable phasing rather than a blocker, but flagging that as of 2a these branches ship untested. The existing test_main_dry_run_writes_outputs harness makes a provisional_targets-resolution test cheap; consider landing it alongside 2a or confirming 2b covers all three new functions (all_target_names, _resolve_target, the handoff thread into agg).


Generated by Claude Code

Comment thread deployment/aws/lambda_handler.py Outdated
# read/index/aggregate deltas; the write phase runs here, so we bracket it
# below and merge it in. Default (no key) leaves the worker path unchanged.
profile = event.get("profile", False)
# Per-cell carrier/reducer (issue #130). Absent key -> "pandas", the

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Handler read/forward is correct: event.get("handoff", "pandas") -> process_shard(..., handoff=handoff), and process_shard's signature (worker.py:48, handoff: str = "pandas") accepts it keyword-only, so an absent key consumes the byte-identical default. The two new handler tests assert both directions (explicit "arrow-kernel" reaches process_shard; no key -> captured["handoff"] == "pandas") and are meaningful, not tautological. No change requested.


Generated by Claude Code

Comment thread src/zagg/runner.py Outdated
and ``"arrow"`` produce byte-for-byte identical scalar outputs (#30);
``"arrow-kernel"`` agrees within ``KERNEL_RTOL`` (#33). Opt-in for
benchmarking. Honored by both the ``"local"`` and ``"lambda"`` backends
(issue #130): the lambda backend forwards it into each cell event, and the

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Docstring fix is good: the previously misleading "Only honored by the "local" backend" line is gone, replaced by an accurate "Honored by both "local" and "lambda" backends ... default "pandas" keeps the event payload byte-identical (no key)" note. The threading through agg -> _run_lambda -> _invoke_lambda_cell is complete and the defaults (handoff="pandas") are consistent at every hop, matching the existing profile parameter's shape. The two TestProfilePlumbing additions assert agg(handoff=...) reaches _run_lambda in both directions. Clean.


Generated by Claude Code

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Adversarial fresh-context review of phase 2b only (commit 88f3dba: scalar config + provisional_targets block + tests). The 2a runner plumbing (all_target_names, _resolve_target, handoff threading) is present at this commit and the four new tests pass against a clean checkout. No blocking findings; everything below is confirmation plus one nit.

Verified against issue #130 acceptance + the (4) decision + CLAUDE.md:

  • Config is genuinely scalar + kernel-able. Loaded atl03_scalar_healpix_o11.yaml: fields count (len), h_min (min), h_max (max), h_variance (var) — all kind == "scalar", none with expression/weights, no kind: keys at all. Every function is in _KERNEL_FUNCS and passes _kernel_able, so handoff="arrow-kernel" runs it end-to-end (no silent fallback). test_scalar_config_is_kernel_able asserts both kind=="scalar" and _kernel_able(meta) per field — meaningful, not tautological (it would catch a vector/ragged or expression field).
  • Three provisional targets, one config + one shard, correct handoffs. scalar_pandas→pandas, scalar_arrow→arrow, scalar_kernel→arrow-kernel, all on healpix_o11. The arrow-kernel target points at the scalar config, not a ragged one — tdigest stays pandas-only, as required.
  • Decision (4) — PR-tree-only — holds. They live under a sibling provisional_targets key, not targets. all_target_names() returns only targets, and test_provisional_targets_excluded_from_merge_matrix pins disjointness and targets ⊆ all_names — so "run all" can't iterate them and the test is meaningful (not just asserting the key is absent).
  • Committed targets matrix unchanged. Diff vs parent 4e0d6ca: identical 8-target matrix; no arrow target snuck in.
  • Shardmap reuse is sound. Config's data_source/read_plan and the output: grid block are byte-identical to atl03_tdigest_healpix_o11.yaml, so the densest-shard healpix_o11 map (static shard_key, aggregator-independent) pairs correctly; the grid output signature only differs in agg fields.
  • Accuracy coverage exists (pre-existing). pandas==arrow byte-identical and kernel-within-KERNEL_RTOL are covered by tests/test_processing.py (kernel-vs-numpy, arrow-matches-pandas) and benchmarks/handoff_bench.py. Phase 2b's tests correctly assert structure/wiring rather than re-deriving accuracy — no fabricated coverage.
  • JSON valid; tests terse; no weakened/broad-ignore; no CI-workflow or live-AWS changes in this commit. ruff check tests/test_benchmark.py clean.

One nit inline.


Generated by Claude Code

@espg espg added the implement label Jun 29, 2026
@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

Status: all four phases landed; the layer-size gate is confirmed green by CI on both architectures (build / build-x86_64 and build / build-arm64 both pass with pyarrow shipped). ruff is green; the pytest suite passed locally and on the prior CI run (test (3.12)/(3.13) are re-running on the latest commit and mirror that result).

Notes for review:

  • Gate fix worth flagging. The first CI build came in ~12 MB over (combined 252M / 250M on x86_64) after keeping Acero+compute for the kept arrow-kernel reducer (decision (2)). I recovered the headroom by also stripping pyarrow's Parquet engine (libparquet* + _parquet*.so) from the layer — verified safe because pyarrow.parquet is used only by zagg.catalog (STAC/geoparquet fetch+build), which runs off-Lambda with the catalog extra and never on the worker (the worker's parquet stack, if any, is fastparquet+cramjam, which stay). Both arches now pass. This is one escalation beyond the plan's strip list; called out here so it's an explicit review point.
  • All three per-phase adversarial self-reviews are posted above (phases 1 / 2a / 2b), no blocking findings.
  • The two open "Questions for review" in the PR body are the place for your input: the (4) provisional_targets PR-tree-only handling, and confirmation you're comfortable with the Parquet strip given the gate.

Leaving this waiting for your review.


Generated by Claude Code

@espg espg added the waiting label Jun 29, 2026
@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

Parquet strip OK?

this is fine.

test_runner.py formatting

don't care

(4) committed-vs-PR-tree handling

... does this let us run benchmarks here, in this thread??

@espg

espg commented Jun 29, 2026

Copy link
Copy Markdown
Member Author

/benchmark

@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

The pyarrow strip can't work for 24.0.0 — import pyarrow needs the whole core set (path A is a dead end)

A local run of the deployed process-shard-test layer (orchestrator run from a network-capable machine, since CI couldn't reach urs.earthdata.nasa.gov) reproduced a new runtime failure — not auth, not network:

[gain_bias_healpix_o11] Unhandled exception: libarrow_substrait.so.2400: cannot open shared object file

The deployed layer is non-importable. The build-time size gate "passed" only because the layer build never imports pyarrow.

Why — the Python bindings hard-link everything

readelf -d on the pinned wheel (pyarrow-24.0.0-cp312-cp312-manylinux_2_28_aarch64). The binding loaded on every import pyarrowlibarrow_python.so and the lib.cpython-…so core extension — declares:

libarrow_python.so.2400  NEEDED: libarrow, libarrow_acero, libarrow_compute,
                                  libarrow_dataset, libarrow_substrait, libparquet
lib.cpython-…-aarch64.so  NEEDED: (same full set)

So removing any of {libarrow, acero, compute, dataset, substrait, parquet, python} breaks import pyarrow outright — even the plain vector-carrier path (pa.FixedSizeListArray / pa.table) that gain_bias uses, which never touches compute. This PR stripped substrait/dataset/parquet, which is exactly what blew up. (Flight is the only true leaf — only _flight / libarrow_python_flight reference libarrow_flight.)

Quantified (aarch64 wheel, 136 MB unzipped)

set size strippable?
mandatory core — libarrow 45.9, compute 14.5, parquet 11.3, substrait 4.9, dataset 2.2, python 2.1, acero 1.7 (+ lib/_compute cython) ~88 MB no — import needs all
Flight (libarrow_flight 23.4 + python_flight + _flight) ~25 MB yes (leaf)
unused fs/format cython (_azurefs/_gcsfs/_hdfs/_orc/_s3fs/encryption/cpp_tests/_substrait) ~3 MB yes
include/ + tests/ ~8 MB yes

Importable minimum is ~100 MB unzipped, dominated by the 88 MB core. That's ~18 MB more than the current (broken) layer, which only "fit" by deleting required libs.

Gate math — it doesn't close on either arch

  • x86_64 was 252 MB / 250 with substrait+dataset+parquet already deleted (and Flight already stripped). Restoring those three to make it importable adds ~18 MB → ~270 MB (~20 MB over).
  • arm64 was 234 MB under the same broken strip; +~18 MB → ~252 MB — also over 250.
  • Flight is already out of those numbers, so there's no ~20 MB left to reclaim — the rest of the layer (numpy/pandas/pyproj/shapely/odc-geo/h5coro/mortie + the 29 MB function) is all required.

Conclusion: pyarrow 24.0.0 cannot be slimmed to fit the 250 MB combined Lambda gate while remaining importable. Path (A) — ship pyarrow, strip to fit — is not viable.

Decision is forced back to (B) or (C)

  • (B) Drop pyarrow from the deployed worker. Write vectors straight from numpy (the write path already round-trips to_numpy(zero_copy_only=False), so the Table is a pure intermediate), and import-gate pyarrow.compute so it's needed only when handoff=arrow-kernel is explicitly set — and don't bundle pyarrow in the layer at all. Removes the entire ~100 MB problem.
  • (C) arro3-core (~7 MB) for the carrier + drop pyarrow — same outcome for the layer; can't cover pyarrow.compute (the kernel reducer).

Both have one consequence worth weighing against the earlier "keep arrow-kernel, benchmark-to-decide" call: with pyarrow out of the layer, the arrow-kernel reducer can't run on Lambda — only locally. Given a 250 MB gate vs. an unstrippable ~100 MB pyarrow, that looks unavoidable.

Recommendation: pivot this PR to (B) — numpy vector writes + pyarrow.compute import-gated-and-unbundled, keeping the handoff plumbing + scalar configs so arrow/arrow-kernel can still be benchmarked locally. The size gate then passes for real (pyarrow leaves the layer entirely), and gain_bias stops crashing. Flagging for your call before I change the approach — this reopens the (B)-vs-(C) question from #130 with the hard numbers that now make (A) impossible.

(Aside: the CI benchmark separately failed at earthaccess.login() with ENETUNREACH to urs.earthdata.nasa.gov — a runner→NASA reachability issue, likely IPv6 or runner egress, not the token. Local orchestration against the deployed function is the workaround that surfaced the bug above.)


Generated by Claude Code

@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

Do (C) -- arro3-core

arro3 (arro3-core) — minimal Rust/pyo3 Arrow (repo). ~7 MB on disk, zero required deps (vs the README's own "pyarrow is 100 MB on disk plus 35 MB numpy"). Ships abi3 wheels (cp311-abi3 manylinux_2_24_aarch64 ≈ 3 MB zipped), which load on 3.12 — on PyPI now (v0.8.1, Jun 2026). It has Array/Table/Schema incl. nested types and FixedSizeList. Crucial gap: compute lives in a separate package, arro3-compute (README), and it provides streaming compute functions, not the Acero TableGroupBy.aggregate hash-aggregate the kernel reducer uses. So arro3 can replace the carrier (vector write / arrow read) but cannot drop-in replace pyarrow.compute for handoff="arrow-kernel".

What (if anything) can we use out of arro3-compute ? Or are we just using arro3-core given that the Acero TableGroupBy.aggregate hash-aggregate isn't in either (correct me if I'm wrong and it is in arro3-compute).

@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

arro3-compute answer + the (C) shape I'll implement

Short answer to your question: you're right — the Acero TableGroupBy.aggregate hash-aggregate is not in arro3-compute, nor in arro3-core, nor anywhere in the arro3 family. For the carrier we use arro3-core only; there is nothing in arro3-compute we can use for the arrow-kernel reducer.

arro3-compute, verified against the installed wheel (v0.8.1, Jun 2026)

I installed arro3-compute==0.8.1 and enumerated it — it exposes exactly 24 functions:

add, add_wrapping, can_cast_types, cast, concat, date_part, dictionary_encode,
div, enums, filter, is_not_null, is_null, max, min, mul, mul_wrapping, neg,
neg_wrapping, rem, sub, sub_wrapping, sum, take, types

These are element-wise arithmetic (add/sub/mul/div/rem/neg), cast/concat/filter/take/dictionary, and whole-array reductions (min/max/sum). Crucially:

  • No group-by / hash-aggregate — nothing keyed on a group column (no group_by, no TableGroupBy). min/max/sum reduce a whole array, not per-group.
  • No variance, no mean, no count kernels at all.

The arrow-kernel reducer (_kernel_aggregate in src/zagg/processing/aggregate.py) needs table.group_by("_cell").aggregate([(src, "min"/"max"/"variance"/"mean"/"count")]) — a per-cell hash-aggregate plus variance/mean/count. None of that exists in arro3. So arro3-compute cannot replace pyarrow.compute for the kernel path, even partially: every kernel-able stat we use (count/variance/mean and the grouped min/max) is absent. (arro3 is built on the Rust arrow crate, whose arrow-row/kernel set is element-wise + global reductions; the grouped hash-aggregate lives in DataFusion, which arro3 does not wrap. Tracked upstream: arro3 README describes arro3-compute as "streaming compute functions", not aggregation.)

Disposition: per your "Do (C)" call, arrow-kernel becomes a local-only / dev benchmark path — it imports pyarrow.compute only when handoff="arrow-kernel" is explicitly set, and pyarrow is not in the Lambda layer, so it can't run on Lambda. We keep the handoff plumbing + scalar configs (phase 2a/2b) for local head-to-head benchmarking; that's the only place that touches pyarrow.compute.

What (C) looks like in code — verified empirically

I installed arro3-core==0.8.1 and confirmed the carrier seam maps cleanly (numpy round-trip byte-identical):

  • Carrier types: arro3-core has Array, Table, RecordBatch, Field, Schema, DataType, ChunkedArray — full set the vector seam needs.
  • FixedSizeList construction from flat buffer + width: fixed_size_list_array(Array.from_numpy(flat), width)DataType<FixedSizeList(width x dtype)>. Direct replacement for pa.FixedSizeListArray.from_arrays(pa.array(flat), width) in _arrow_column.
  • Scalar column: Array.from_numpy(block) replaces pa.array(block); Table.from_pydict({...}) replaces pa.table({...}).
  • The numpy round-trip without pyarrow (the crux — write.py's _iter_carrier_columns currently does col.values.to_numpy(zero_copy_only=False).reshape(n_rows, width)):
    • scalar column → Array.to_numpy() (arro3's own helper; no pyarrow). Verified dtype/values identical to the pandas/pyarrow block.
    • vector column → list_flatten(col).to_numpy().reshape(n_rows, width), with col.type.is_fixed_size_list and col.type.list_size for the type check + width (mirrors pa.types.is_fixed_size_list / type.list_size). Verified reshape(n_cells, width) is byte-identical to what pyarrow produced.
    • arro3 Array also implements __array__ and __arrow_c_array__ (PyCapsule), so np.asarray(arr) works too — but to_numpy()/list_flatten is the direct, explicit path I'll use.

So the on-disk numpy blocks are unchanged; only the intermediate Arrow library changes. No carrier-seam rework, no design fork — proceeding with (C).

Plan (phased, one commit each, self-review per phase)

  1. Carrier swapprocessing/write.py (_arrow_column/_build_output/_iter_carrier_columns) and the arrow read path (processing/read.py's pa.table(...)) to arro3-core. Output byte-identical.
  2. Layer/depsbuild_layer.sh: drop the pyarrow install + the whole pyarrow component-strip block, add arro3-core. pyproject.toml: drop pyarrow==24.0.0 from the lambda extra, add arro3-core (==0.8.1). Keep pyarrow as a core dep (used off-Lambda by zagg.catalog STAC/geoparquet and by the local arrow-kernel path).
  3. Import-gate pyarrow.computearrow-kernel imports pyarrow only when explicitly selected; the default pandas and the arro3 vector-carrier paths import no pyarrow.
  4. Tests — swap pyarrow-carrier assertions to arro3; add a guard that the default + vector-write worker path doesn't import pyarrow; keep byte-identity/handoff tests green.

New dependency note: arro3-core (~7 MB, zero required runtime deps on 3.12) is authorized by your Do (C) comment. It replaces pyarrow in the deployed layer only.

I'll update the PR body to the (C) pivot as I land the phases.


Generated by Claude Code

@espg espg changed the title pyarrow into the Lambda layer + handoff benchmark matrix (#130) arro3-core carrier for the Lambda worker; drop pyarrow + arrow-kernel (#130) Jun 30, 2026

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 from Claude (review)

Fresh-context adversarial review of the pyarrow→arro3-core pivot (commits 984f9bd, 6a3c19c, 0192fce, 3913a0b; diff a5012d4..HEAD). Reviewed against the 7 intent points plus the arro3-vs-pyarrow API-divergence and empty-carrier concerns. Overall this is clean and correct — carrier byte-identity holds, the worker imports no pyarrow on default/vector/arrow paths, arrow-kernel is fully excised with no orphan imports, and deps↔layer agree. Verified locally in the worktree venv (arro3 0.8.1): the 13 arrow/vector carrier tests pass, runner/lambda/benchmark suites pass (131 passed / 10 skipped), import zagg.processing has no orphaned names, the synthetic benchmark runs (parity: OK (pandas == arrow)), and ruff is clean on the diff (the one N818 in registry.py is pre-existing, outside this PR).

Findings, severity-ranked:

LOW-1 — Latent edge: arro3 0.8.1 cannot build a 0-length array, but write.py's empty-arrow path can never reach it (so it's safe, but the dead guard is now misleading).
Confirmed locally: Table.from_pydict / fixed_size_list_array with a 0-length input raises ValueError: 0-length dimension not currently supported. pyarrow built 0-row tables fine. I traced every construction site and the vector/arrow carrier is never built with 0 rows in practice:

  • _build_output sizes columns to n_cells = len(children) = prod(chunk_shape) > 0 (a chunk is a fixed cell grid; empty cells get fill values, never a 0-row block).
  • _read_group returns None on an emptied read (read.py:521), and process_shard early-returns an empty pandas DataFrame when not all_reads (worker.py:260) before any arro3 concat, so _concat_and_group's Table.from_batches(..., schema=all_reads[0].schema) and _read_group's Table.from_pydict never see an empty input.

So no live bug. But the consequence is that _carrier_empty(carrier) -> carrier.num_rows == 0 (write.py:91) and the num_rows == 0 skip in the write loop are now unreachable for the arrow carrier — an arro3 table with 0 rows can't be constructed to be tested. Not blocking; worth either a one-line comment that the arro3 carrier can't be empty by construction, or leaving as-is for the pandas path (which still can be empty). Flagging so the dead branch isn't mistaken for live empty-arrow handling later.

LOW-2 — Doc nit (non-blocking): aggregate.py still imports get_agg_fields/get_output_signature etc. correctly and get_data_vars was removed cleanly — verified no orphan. No action; just confirming the __init__.py/__all__ cleanup is complete (KERNEL_RTOL, _KERNEL_FUNCS, _kernel_able, _kernel_aggregate all gone from imports AND __all__).

Point-by-point confirmations:

  1. Carrier byte-identity — PASS. Array.from_numpy(np.ascontiguousarray(...)), fixed_size_list_array(...), and list_flatten(col).to_numpy().reshape(n, width) round-trip exactly. FSL detection via type.list_size is robust: verified list_size is 3 for an FSL and None for a scalar Array, so the width is not None test is correct. uint64 morton survives to_numpy() as uint64 (checked). Contiguity is forced via np.ascontiguousarray at every from_numpy; no endianness risk (numpy-native).
  2. Worker pyarrow-free — PASS. worker.py/aggregate.py have no import pyarrow; the arrow handoff uses arro3.core (_concat_and_group), and read.py's arrow=True branch builds arro3.core.Table.from_pydict (NOT pa.table) — lazy import, arro3 only. The new test_default_and_vector_write_path_works_without_pyarrow blocks pyarrow via a subprocess meta-path finder and exercises both carriers — a genuine guard, passes.
  3. arrow-kernel removed — PASS. No arrow-kernel / _kernel_* / KERNEL_RTOL / scalar_kernel in src/tests/benchmarks/targets (only accurate historical-context comments remain). process_shard validation is now pandas|arrow; scalar_kernel_healpix_o11 dropped from targets.json. Stale .pyc files in __pycache__ still match but are build artifacts, not source.
  4. lambda extra ↔ build_layer.sh — PASS. Both pin arro3-core==0.8.1, no pyarrow; the pyarrow component-strip block is fully removed; core dep is arro3-core>=0.8.1 (floor consistent with the == pins). pyarrow is in the catalog extra.
  5. pyarrow retained for catalog only — PASS. catalog/sources.py is the sole live pyarrow user; pyarrow moved to the catalog extra; core deps no longer list it.
  6. Tests meaningful — PASS. Migrated TestArrowHandoff/TestVectorCarrier assert real round-trip behavior and pandas↔arrow byte-equality; the null-guard test still trips on an Arrow null; no broad # noqa/# type: ignore; deleted assertions are all kernel-path-specific (legitimately gone).
  7. Terseness / line limit — PASS. aggregate.py shrank to 650 lines. runner.py is 1162 lines but this PR only edits a docstring there (pre-existing size, not introduced here). Comments are accurate post-pivot.

No approve/request-changes per routine policy — COMMENT only. Nothing here blocks the next phase; LOW-1 is the only item worth a follow-up note.


Generated by Claude Code

@espg espg removed the waiting label Jun 30, 2026
@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

Path-C pivot landed — deployed worker is now pyarrow-free (arro3-core)

All phases of the (C) pivot are pushed (commits 984f9bd63cd564). Summary:

What changed

  • Carrier → arro3-core (write: processing/write.py; read: processing/read.py's arrow=True path now builds arro3.core.Table, not pyarrow). On-disk numpy blocks are byte-for-byte identical to the pyarrow output (FixedSizeList via fixed_size_list_array + list_flatten(...).to_numpy().reshape(n, width); FSL detected via type.list_size).
  • handoff="arrow" kept, on arro3 — the arro3→numpy round-trip is a clean carrier swap, so the pandas-vs-arrow carrier comparison (scalar_pandas vs scalar_arrow) is preserved. arro3 has no whole-table concat, so _concat_and_group collects record batches via Table.from_batches.
  • arrow-kernel reducer dropped entirely_kernel_aggregate/_KERNEL_FUNCS/_kernel_able/KERNEL_RTOL, the worker branch, the scalar_kernel target, and all kernel tests. Confirmed against arro3-core/arro3-compute 0.8.1 that there is no hash-aggregate anywhere in the arro3 family, so with pyarrow out of the layer the kernel path can't deploy. handoff is now pandas|arrow only.
  • Deps: pyarrow demoted core → catalog extra (and removed from the lambda extra); arro3-core added to core (>=0.8.1) + lambda extra (==0.8.1, in sync with build_layer.sh). build_layer.sh: pyarrow install + the whole pyarrow component-strip block removed, arro3-core==0.8.1 installed instead.
  • catalog stays on pyarrow (off-Lambda; stac-geoparquet hard-requires pyarrow>=16 and catalog/sources.py uses it directly). Not rewritten.

Verification

  • Full suite 920 passed, 15 skipped locally; ruff check clean; touched files ruff format-clean.
  • New pyarrow-free guard test runs the default + vector arro3 write carrier in a subinterpreter that blocks every pyarrow import via a meta-path finder, asserting the carrier works and pyarrow never enters sys.modules — reproducing the layer's pyarrow-free closure. I also verified the handoff="arrow" read→concat→group chain runs with pyarrow blocked.
  • Per-phase fresh-context adversarial self-review posted; its one substantive finding (LOW: _carrier_empty's num_rows==0 arm is unreachable for the arro3 carrier since arro3 can't build 0-length arrays) was folded as a clarifying docstring in 63cd564. It independently confirmed the arrow read path builds arro3 Tables and the kernel removal left no orphans.

Two items for your call (in the PR body's "Questions for review"):

  1. scalar_pandas 0-obs needs a real re-run. The prior 0-obs was against the broken (A) layer; I diffed the scalar config's data_source/read_plan against the working tdigest config — byte-identical (only the aggregation differs, scalar-only), so it's not a config bug. Needs a genuine re-run on the arro3 (now-importable) layer to confirm.
  2. arro3 pin is ==0.8.1 in the lambda extra / >=0.8.1 in core — bump policy is yours.

Only CI confirmation remains (the lambda-build Size Report should pass comfortably now that pyarrow's ~100 MB is gone). I can't run CI myself, so leaving implement (no waiting) with this status note; will mark ready-for-review once CI is green and you've had a look.


Generated by Claude Code

@espg espg left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted (out of date).

incorrect assumptions from agent; deleting these comments and reverting the commits by force push

@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude

handoff='arrow' is salvageable — the OOM was a one-line lifetime bug

Full diagnosis in #130 (comment). Short version: the arro3 read carrier crash that motivated dropping it (96dfa09) was not a segfault or an arro3/numpy ABI issue — it was memory exhaustion that manifested as a 720 s timeout.

CloudWatch ground truth (same dense o11 shard, 200,676 photons):

carrier result max mem
gain_bias (arro3 vector write) 1278 MB
scalar_pandas 1725 MB
scalar_arrow (arro3 read handoff) ✗ timeout 2048 MB (cap)

Root cause (measured, not inferred). RSS-traced on Linux/arm64 with the layer's exact numpy==2.2.6 + arro3-core==0.8.1:

  • from_numpy/from_pydict/from_batches are zero-copy (Arrow adopts the numpy buffer — confirmed by mutation visibility + flat RSS).
  • The single forced copy is combine_chunks().to_numpy() in _concat_and_group (contiguous materialization — unavoidable).
  • The bug: the chunked Arrow table (and the per-read tables in all_reads) are kept alive through _group_columns, so the pooled data is held twice during grouping. Pandas avoids this for free (pd.concat, then inputs fall out of scope).

The fix — free the Arrow buffers the instant the columns are numpy, before grouping:

cols = {n: table.column(n).combine_chunks().to_numpy() for n in table.column_names}
del table, batches
all_reads.clear()          # not used after this call (worker.py)

Proof:

  • Correct: on the real NEON median shard through the actual process_shard, handoff=arrow (fixed) is byte-identical to pandas — equal total_obs/cells_with_data and equal output-column checksums.
  • Survives the cap: same workload under --memory=1600m: unfixed → OOM-kill (exit 137), fixed → exit 0, identical checksum.
  • Lighter than pandas: matched scale (6×3M photons) peak RSS — pandas 1456 MB vs fixed-arro3 1019 MB (~30% lower), because pandas holds 3× (all_reads + concat + grouped) while fixed-arro3 holds 2×. (The pandas branch could take the same del all_reads hygiene and match.)

Also added opt-in per-stage RSS logging (ZAGG_PROFILE_RSS=1) in _concat_and_group so a run prints exactly where it peaks, for both carriers.

Landing note

Reviving handoff='arrow' means undoing the two transition commits — 96dfa09 (drop arrow / pandas-only) and 2155fc4 (region_timing pandas-only) — then applying the lifetime fix on top. The fix is staged; the branch will be updated to restore the arrow path + fix. The arro3 vector write carrier (gain_bias) was never affected and stays as-is.

Net: the pyarrow→arro3 migration stands; the arro3 read carrier is revivable as the lighter, faster option, with the lifetime fix.

@espg espg force-pushed the claude/130-pyarrow-layer-bench branch from 2155fc4 to b57c3f0 Compare June 30, 2026 03:27
@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude (review)

Fresh-context adversarial review of the arro3-core migration + the memory-lifetime fix (b57c3f0), with the hardest scrutiny on the fix. No blocking issues found.

Memory-lifetime fix (aggregate.py _concat_and_group arrow branch) — clean

  • all_reads is provably unused after the call. In worker.py it's referenced only at lines 192/221/242/260/307 — all before _concat_and_group (line 307). The K>1 iter_chunks seam works off col_arrays/cell_to_slice, never all_reads; the function is called once per process_shard. So all_reads.clear() mutating the caller's list is safe.
  • del table, batches cannot corrupt cols. Verified empirically against arro3-core 0.8.1: combine_chunks().to_numpy() returns arrays with owndata=True / base is None (independent copies) for every dtype in play — uint64 (leaf_id/morton), float32 (h), int32 (count) — in both single-read and multi-read (from_batches) cases. Data in cols survives intact after del ...; all_reads.clear(); gc.collect(). No use-after-free.
  • Edge cases hold: empty all_reads is short-circuited in worker.py before this branch; 0-row reads never reach Table.from_pydict (every read.py filter returns None when it empties), which matters because arro3 0.8.1 raises on a 0-length Array.from_numpy. The null_cols guard works.

Carrier migration — byte-faithful

  • Read (read.py Table.from_pydict): post-mask columns are fresh numpy copies that own their data independent of the h5coro cache; from_batches(..., schema=all_reads[0].schema) is safe (non-empty, shared schema).
  • Vector write (write.py): fixed_size_list_array(...)list_flatten(...).to_numpy().reshape(n, width) round-trips byte-identically (float32 verified, uint64 morton preserved); col.type.list_size returns None for scalar types (doesn't raise), so _iter_carrier_columns is correct.
  • No pyarrow leftovers on the worker path — the only import pyarrow is in catalog/sources.py, correctly confined to the off-Lambda catalog extra (not in the lambda import closure). pyproject.toml/build_layer.sh pins are consistent (arro3-core==0.8.1); dropped kernel symbols are gone consistently.

Tests / lint

  • test_processing + test_runner + test_benchmark: 279 passed; test_lambda_handler: 23 passed; arrow/handoff subset: 8 passed.
  • ruff check src tests: one N818 on registry.py:64pre-existing on main, not this PR, and the lint bot's --select=E,F,W,I doesn't flag N, so leaving it (per repo conventions, don't fix unrelated failures here).

Two nits — folded

  1. The fix comment said the freed all_reads tables were "zero-copy views onto the source read buffers" — they're actually fresh numpy copies the arro3 arrays own. Reworded; the memory conclusion is unchanged.
  2. _rss_mb()'s import resource fallback was uncaught (would raise on a /proc-less, resource-less platform only when ZAGG_PROFILE_RSS is set). Made it fully non-raising (returns 0.0 as last resort), since it's diagnostic-only.

Both folded as a follow-up commit.

@espg

espg commented Jun 30, 2026

Copy link
Copy Markdown
Member Author

🤖 from Claude (review)

Final pre-merge adversarial review at e9b2d49 (focused on the arrow default flip — the new surface since the prior review). No blocking issues; nothing to fold.

  • Wire protocol round-trips both carriers. agg() default arrowrunner.py injects event["handoff"]="arrow" (guard != "pandas") → lambda_handler reads it. Explicit agg(handoff="pandas") → key omitted → handler default "pandas" → byte-identical legacy path. No path runs the wrong carrier.
  • Local backend consistent. _process_and_write (default arrow) threads one handoff into both the read (read.py arrow=True → arro3 tables) and _concat_and_group's arrow branch — no path defaults to arrow but feeds pandas reads.
  • No stale tests. The two retained pandas-asserting tests (test_default_handoff_is_pandas in the handler test; test_default_handoff_event_has_no_handoff_key) are intentional — they test the handler absent-default and the byte-identity guard (explicit pandas), both unchanged. Correct, not drift.
  • targets.json valid; all 8 matrix targets carry "handoff": "arrow"; provisional scalar A/B untouched.
  • null_cols guard won't fire on the 8 configs — raw h5coro reads carry no null mask (null_count == 0); the guard is a forward-looking precondition for nullable sources (→ Make the worker carrier (handoff) a config-driven aggregation-YAML field #132).
  • Docstrings/comments accurate; remaining "pandas" defaults (process_shard, read.py arrow=False, handler absent-default) are the intentionally-retained library defaults, not stale.
  • Folds (8c04fe1) verified: buffer-free comment matches the code; _rss_mb() never raises.

Tests: test_runner+test_lambda_handler+test_benchmark 141 passed; test_processing 161 passed; ruff check clean (only the pre-existing registry.py N818, unrelated). PR description spot-checked against code — accurate.

Verdict: ready to merge. The two "Questions for review" items (validate the fixed read carrier on the dense shard on real Lambda; config-driven handoff in #132) are post-merge follow-ons, not blockers.

@espg espg marked this pull request as ready for review June 30, 2026 04:41
@espg espg merged commit 29b392e into main Jun 30, 2026
8 checks passed
@espg espg deleted the claude/130-pyarrow-layer-bench branch June 30, 2026 04:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Lambda layer omits pyarrow → vector aggregators (tdigest) crash with "No module named 'pyarrow'"

2 participants