Temporal aggregation infrastructure (Refs #12)#70
Conversation
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Phase 2 review of 97e916e — fresh-context adversarial pass on src/zagg/registry.py and tests/test_registry.py.
Overall the seam is clean and the eight-registry surface matches the June plan exactly. Tests cover the round-trip, decorator form, duplicate-raise, empty-name reject, lazy discovery trigger via get_* / list_* / registry_snapshot(), and discovery-failure isolation across both ep.load() and register() failure modes. Phase 2 ships the seam alone with empty registries (as the PR body confirms), which is the right scope.
Findings, in priority order:
- Doc rot —
_ensure_discovereddocstring says failures log at WARNING but the code useslogger.exception(ERROR). Trivial fix but the inaccurate contract will mislead the next reader. - Early-failure trap —
_DISCOVERED = Trueis set beforemetadata.entry_points(...)is called. If that call itself raises (very rare, but thepragma: no coverexists for a reason),_DISCOVEREDstays sticky-True and no plugin ever loads in this interpreter unless someone callsdiscover_plugins(force=True). Consider resetting in the except branch, or only flipping_DISCOVEREDafter the EP iteration completes. discover_plugins(force=True)semantics are surprising — the docstring advertises it for tests that install a plugin after import, but re-running discovery without clearing registries means every previously-registered plugin name now hits the "already registered" path and is silently logged-and-skipped. Either document the caveat or call out that callers must clear the affected registry first.- Comment-claim mismatch on reentrancy — the "Mark discovered before loading so a plugin that calls back into a
get_*helper during itsregister()doesn't recurse" comment is correct forget_*reentry, but a plugin that callsdiscover_plugins()(notget_*) from within its ownregister()still short-circuits because of the same flag — worth either testing that path or noting it. - Thread-safety — no lock around the
_DISCOVEREDcheck/set. Lambda single-threaded handlers are fine; ad-hoc multi-threaded callers (notebook, future MCP server) could race and double-register. Either add anRLockor document the single-threaded assumption. - Missing edge-case tests:
replace=Truevia the decorator form is never tested — only the direct-call form is.- No test for
discover_plugins()being a no-op after_ensure_discovered()has already run via a lazy trigger (covered indirectly, not explicitly). - No assertion that
registry_snapshot()returns 8 keys even when all registries are empty (covered bytest_snapshot_lists_every_registry, OK on re-read — strike this).
- Caplog assertion is fragile —
rec.messageis the formatted message aftergetMessage()runs, but only because pytest's handler calls it; if anyone swaps formatters this silently breaks.rec.getMessage()is the documented-stable accessor. Minor. - Public API surface (question for @espg, not a defect) —
zagg/__init__.pyuses a_LAZYpattern foragg/authre-exports but adds nothing for the registry helpers. External plugin authors will needfrom zagg.registry import register_mask_provider. That's a defensible choice for now but worth a deliberate decision before phase 4 starts pulling built-ins in.
Posting inline at the specific lines.
Generated by Claude Code
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Inline findings for Phase 2 (97e916e). Summary already posted in the prior review on this PR; this pass attaches the same items inline at the lines they live on, plus one open API-surface question for @espg.
Highest-priority items: (1) _DISCOVERED = True is set before entry_points() is called — early failure leaves the seam permanently degraded; (2) discover_plugins(force=True) re-runs registration without clearing registries, so already-registered names trip the duplicate path and force=True looks like a no-op for the typical use case; (3) docstring on _ensure_discovered claims WARNING but code logs at ERROR; (4) thread-safety on _DISCOVERED + the registry dicts is undocumented. Other items (missing replace=True decorator-form test, fragile rec.message caplog assertion, top-level export decision) are smaller.
No correctness defect blocks the phase; the early-failure trap and force=True foot-gun are the two items I'd most want addressed before the Lambda handler in phase 7 starts calling discover_plugins() for deterministic surfacing.
Generated by Claude Code
| Each entry point must resolve to a callable that, when invoked with no | ||
| arguments, registers its capabilities through the helpers below. A | ||
| failure in any one plugin is logged at WARNING but does not crash the | ||
| discovery pass — the rest still load. |
There was a problem hiding this comment.
🤖 from Claude (review)
Doc rot: the docstring says "logged at WARNING" but the implementation below uses logger.exception(...) on lines 143, 149, and 154 — those log at ERROR (and include a traceback). The test test_failing_plugin_does_not_crash_discovery also calls caplog.at_level("ERROR"), which is the de-facto contract. Fix is either updating the docstring to "ERROR" or downgrading to logger.warning. The current text will mislead anyone wiring up log filters around "WARNING-or-above".
Generated by Claude Code
| _DISCOVERED = True | ||
| try: | ||
| eps: Iterable[metadata.EntryPoint] = metadata.entry_points(group=_ENTRY_POINT_GROUP) | ||
| except Exception: # pragma: no cover - importlib.metadata API surface is narrow | ||
| logger.exception("zagg.plugins entry-point lookup failed; no plugins loaded") | ||
| return |
There was a problem hiding this comment.
🤖 from Claude (review)
Early-failure trap: _DISCOVERED = True is set on line 139 before metadata.entry_points(...) is called on line 141. If that call itself raises (the pragma: no cover branch), _DISCOVERED stays sticky-True forever — the next get_* / list_* short-circuits in _ensure_discovered, and no plugin will ever load in this interpreter without an explicit discover_plugins(force=True). There's no log indication that the system is in a "permanently degraded" state on subsequent calls either; the exception is logged exactly once and then silently swallowed.
Suggested fix: reset _DISCOVERED = False in the except branch (so the next call retries), or restructure to flip _DISCOVERED = True only after the entry_points() call returns successfully. The reentrancy concern that motivates the early-flip only applies to the EP iteration loop, not to the lookup itself.
Generated by Claude Code
| def discover_plugins(*, force: bool = False) -> None: | ||
| """Trigger entry-point discovery explicitly. | ||
|
|
||
| Discovery is normally lazy (deferred until the first ``get_*`` / ``list_*`` | ||
| call). Tests and the Lambda handler call this at a deterministic point so | ||
| failures surface early. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| force : bool | ||
| Re-run discovery even if it has already happened. Useful in tests | ||
| that install a plugin after import. | ||
| """ | ||
| global _DISCOVERED | ||
| if force: | ||
| _DISCOVERED = False | ||
| _ensure_discovered() |
There was a problem hiding this comment.
🤖 from Claude (review)
discover_plugins(force=True) is advertised as "useful in tests that install a plugin after import," but the actual behaviour is surprising: it does NOT clear the registries, so re-discovery against an interpreter that already has plugin-A registered will re-call plugin-A's register(), hit the duplicate-name check in _register (line 92), raise ValueError("already registered"), and that exception gets caught and logged on line 154. Net result: force=True looks like a no-op (modulo an ERROR log line per already-registered name), and only NEW plugins/names are picked up. Either:
- document the caveat explicitly ("does not clear existing registrations; only picks up newly-installed plugins or new names"), or
- have
force=Trueclear the registries first (riskier — built-ins registered via in-tree imports would get wiped too), or - pass
replace=Truesemantics through the re-discovery path.
Whichever route, the test test_discover_plugins_force_reruns only exercises this with a register_fn that doesn't register anything, so the foot-gun isn't currently caught.
Generated by Claude Code
| # Discovery is idempotent across processes within a single interpreter; we | ||
| # track it so repeated ``get_*`` / ``list_*`` calls don't re-iterate | ||
| # ``importlib.metadata.entry_points`` every time. | ||
| _DISCOVERED = False |
There was a problem hiding this comment.
🤖 from Claude (review)
Thread-safety: the _DISCOVERED flag plus the eight module-level dicts are mutated without a lock. Two threads concurrently calling get_* on a cold registry can both see _DISCOVERED == False in _ensure_discovered, both flip it to True, and both iterate entry points — the second pass then trips the "already registered" path for every plugin, which is caught/logged but produces noisy ERRORs and obscures real failures. Lambda's single-threaded handler model means this isn't urgent today, but the MCP describe_products use case mentioned in registry_snapshot() and any future notebook / async caller could hit it. Either add an RLock around _ensure_discovered (and ideally around _register too) or document the single-threaded assumption explicitly.
Generated by Claude Code
| def test_duplicate_registration_raises(self, kind, reg, get, lst): | ||
| reg("demo", object()) | ||
| with pytest.raises(ValueError, match="already registered"): | ||
| reg("demo", object()) | ||
|
|
||
| def test_replace_overrides_existing(self, kind, reg, get, lst): | ||
| first = object() | ||
| second = object() | ||
| reg("demo", first) | ||
| reg("demo", second, replace=True) |
There was a problem hiding this comment.
🤖 from Claude (review)
Missing coverage: replace=True is exercised only via the direct-call form (reg("demo", second, replace=True)). The decorator path @reg("demo", replace=True) is never tested. The decorator and direct-call codepaths thread replace through different surfaces (_decorator(..., replace=replace) vs _register(..., replace=replace)), and a future refactor that drops replace from _decorator's signature wouldn't be caught here. One extra parametrised case would close the gap.
Generated by Claude Code
|
|
||
| class TestDiscovery: | ||
| def test_discovery_runs_once_on_get(self, monkeypatch): | ||
| calls = [] | ||
|
|
||
| def register_fn(): | ||
| calls.append(1) | ||
| register_spatial_func("from_plugin", lambda: None) | ||
|
|
||
| _install_entry_points(monkeypatch, [_FakeEntryPoint("demo", register_fn)]) | ||
| registry._DISCOVERED = False | ||
|
|
||
| # First lookup triggers discovery and finds the plugin. | ||
| assert get_spatial_func("from_plugin") is not None |
There was a problem hiding this comment.
🤖 from Claude (review)
assert calls == [1, 1] confirms the discovery ran twice, but the test never installs a registration in register_fn and so doesn't exercise the actual force=True foot-gun documented above (re-discovery against already-registered names triggers the duplicate path). Suggest the test also include a register_* call inside register_fn so the duplicate behaviour on re-discovery is visible — right now the test would pass even if force=True were a complete no-op for the dispatch table.
Generated by Claude Code
| def bad(): | ||
| raise RuntimeError("plugin exploded") | ||
|
|
||
| _install_entry_points( |
There was a problem hiding this comment.
🤖 from Claude (review)
rec.message is the formatted message only because pytest's LogCaptureHandler happens to call record.getMessage() and stash it on the record — LogRecord.message is officially set by Formatter.format(), not by record construction. If a future pytest swaps handler internals the assertion silently breaks. Use rec.getMessage() for a stable contract. (Same applies to the parallel assertion in test_failing_entry_point_load_does_not_crash.)
Generated by Claude Code
| and the eventual MCP ``describe_products`` tool (issue #59). | ||
| """ | ||
| _ensure_discovered() | ||
| return {kind: sorted(reg) for kind, reg in _REGISTRIES.items()} |
There was a problem hiding this comment.
🤖 from Claude (review)
Public-API question for @espg (not a defect — please decide before phase 4 lands built-ins):
zagg/__init__.py already has a deliberate surface (the _LAZY pattern for agg/auth to keep earthaccess out of the Lambda layer). Phase 2 adds nothing — external plugin authors must from zagg.registry import register_mask_provider. That's defensible (registration is plugin-internal, not orchestrator-facing), but registry_snapshot and the discover_plugins trigger are exactly the kind of diagnostic helpers a top-level CLI or MCP describe_products will want to import unambiguously. Worth a one-line decision: keep import-as-needed, or surface discover_plugins + registry_snapshot at the package root. Either is fine; the cost of changing minds later is the cost.
Generated by Claude Code
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Phase 3 adversarial review of commit cb95a76. Diff is a clean, well-scoped extraction; runner.py no longer imports botocore.config.Config or compute_available_workers, and the import surface in dispatch.py is stdlib + zagg.concurrency only. CLAUDE.md adherence is good — dispatch.py is ~280 lines (well under the 1000-line gate), no new deps, title-only commit message.
Two real defects against the byte-identical contract:
-
Failure-path result dict gained a
timeoutkey (dispatch.py:169). Pre-extraction_invoke_lambda_cell's all-attempts-exhausted return had eight fields; the post-extraction layered result has nine. This makes the success/failure paths consistent (probably an intentional cleanup) but it IS a schema delta on the path the PR description claims is byte-identical. -
wall_timeno longer covers event construction (dispatch.py:112vs runner.py:686-705). Microseconds in practice, but it is observably different from the pre-extraction value. Worth either fixing (threadwall_startin from_invoke_lambda_cell) or noting in the PR body.
Test-coverage gaps that would have caught those: the all-attempts-exhausted test only asserts four of the nine result fields; nothing pins the layered shape returned by the rewritten _invoke_lambda_cell end-to-end (the **result splat is invisible to current tests). A two-line assert set(result) == {...} in either tests/test_dispatch.py or tests/test_runner.py would lock the contract until Question 2's dedicated parity test lands.
Cleanups (not regressions, but the extraction is a natural opportunity):
invoke_with_retrysleeps after the final attempt's retryable exception (pre-existing, but worth fixing here).preflight_concurrency_probereassignsmax_workersand then reuses it; rename toclamped_workersfor readability.- The
if not function_error else {}guard atdispatch.py:137carries a footgun (Payload is single-read) that's no longer self-evident outside the original function — worth an inline comment. - The cost-table test reaches into
_PRICE_PER_GB_SEC; either lift it to public or assert behaviorally.
estimate_cost defaults match the pre-extraction inline math exactly (gb_seconds = total_lambda_time * 2.0, 0.0000133334 arm64 price). preflight_concurrency_probe client-build ordering (lambda probe → cloudwatch → lambda dispatch with max_pool_connections sized to the clamp) matches pre-extraction. Updated monkey-patch signature in tests/test_runner_concurrency.py (session, function_name, *, region, max_workers) matches the actual function. All 60 tests in test_dispatch.py, test_runner.py, and test_runner_concurrency.py pass locally.
No blocking issues; finding (1) is the only one that arguably needs an answer before merge — accept the cleanup or restore strict shape parity.
Generated by Claude Code
| "lambda_duration": 0, | ||
| "error": last_error, | ||
| "retries": max_retries, | ||
| "timeout": is_timeout, |
There was a problem hiding this comment.
🤖 from Claude (review)
Per-cell result schema delta on the all-attempts-exhausted path. The pre-extraction _invoke_lambda_cell failure-return dict had eight keys: shard_key, status_code, body, wall_time, lambda_duration, error, retries, granule_count — note the absence of timeout. The new failure-return here adds "timeout": is_timeout, which _invoke_lambda_cell then layers shard_key/granule_count over, so every failed cell in summary["results"] now carries a ninth key it never did before.
The success-path return dict (line 143-151) DID always carry timeout, so the change makes the two paths consistent — arguably an improvement — but it is a strict violation of the "the per-cell result dict has the same keys" contract spelled out in the PR description. _run_lambda doesn't read timeout itself, so the only observers are downstream tools iterating summary["results"]; still, this is exactly the kind of silent schema drift Question 2's tests/test_dispatch_parity.py would have caught. Either accept the delta and note it in the PR body, or drop "timeout" from the failure dict to match the pre-extraction shape verbatim.
Generated by Claude Code
| Forwarded to ``raise_for_fd_exhaustion`` so the ulimit guidance can | ||
| recommend a usable cap. Not otherwise consulted. | ||
| """ | ||
| wall_start = time.time() |
There was a problem hiding this comment.
🤖 from Claude (review)
wall_time no longer covers event construction. Pre-extraction, wall_start = time.time() was the very first statement of _invoke_lambda_cell — before the event = {...} dict was built. After Phase 3 the event is built in runner._invoke_lambda_cell (runner.py 686-705) and wall_start here is taken only when invoke_with_retry is entered. Net result: result["wall_time"] in every per-cell record now excludes the spatial event-build (granule URL list copy, s3 cred dict, etc.).
In practice this is microseconds and won't show up in any human-readable summary, but it's a measurable delta against the byte-identical contract, and any future cost/throughput parity harness will flag it. If you want a strict refactor, take wall_start in _invoke_lambda_cell before building the event and thread it into invoke_with_retry (e.g. an optional wall_start kwarg defaulting to time.time()); otherwise call it out in the PR body so it's not surprising later.
Generated by Claude Code
| assert result["status_code"] is None | ||
| assert result["body"] == {} | ||
| assert result["retries"] == 2 | ||
| assert "never works" in result["error"] |
There was a problem hiding this comment.
🤖 from Claude (review)
The all-attempts-exhausted test doesn't pin timeout or wall_time. This case is the strongest signal we have that the failure-path dict shape didn't drift, and it only asserts four fields (status_code, body, retries, error). It silently accepts whatever new keys the dispatch decides to emit (see my comment on dispatch.py:169 about the new timeout key). Add assert result["timeout"] is False and assert set(result) == {"status_code", "body", "wall_time", "lambda_duration", "error", "retries", "timeout"} so any future shape change has to be made deliberately.
Same gap applies to test_unretryable_client_exception_breaks_out (line 145-150) — it only checks two fields. These are the two paths most likely to silently grow/lose keys, so pinning the key set here is cheap insurance.
Generated by Claude Code
| "lambda_duration": 0, | ||
| "error": last_error, | ||
| "retries": max_retries, | ||
| **result, |
There was a problem hiding this comment.
🤖 from Claude (review)
No integration test pins the layered shape returned by the rewritten _invoke_lambda_cell. tests/test_dispatch.py covers invoke_with_retry in isolation and tests/test_runner.py::TestInvokeLambdaCellEvent only pins the event payload, not the result. Nothing asserts that after **result the per-cell dict has shard_key + the generic keys + granule_count in that order, with no collisions and no dropped fields.
A two-line test (mock a successful invoke, call _invoke_lambda_cell, assert set(result) == {"shard_key", "status_code", "body", "wall_time", "lambda_duration", "error", "retries", "timeout", "granule_count"}) would lock the spatial-path contract and bridge the gap until Question 2's dedicated parity test lands. Worth folding into this PR rather than waiting.
Generated by Claude Code
| # Non-timeout FunctionError: retry with a fresh request. | ||
| continue | ||
|
|
||
| result = json.loads(response["Payload"].read()) if not function_error else {} |
There was a problem hiding this comment.
🤖 from Claude (review)
Minor: the conditional on this line preserves a quirk worth a code comment. On the FunctionError-timeout branch we've already called response["Payload"].read() at line 127, and this else {} guard avoids the second read that would yield empty bytes (since botocore StreamingBody is single-use). Pre-extraction code had the same shape but the locality made it self-evident; in the extracted form, future readers may innocently "simplify" the conditional and break the timeout path silently.
Suggest a brief inline comment along the lines of # Payload was already consumed above for FunctionError; second read would raise / yield b''. Keep the guard. Cheap, and prevents a footgun on the next refactor.
Generated by Claude Code
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _make_session_factory(probe_max: int = 64): |
There was a problem hiding this comment.
🤖 from Claude (review)
_PRICE_PER_GB_SEC underscore-prefixed import. The test reaches into the private _PRICE_PER_GB_SEC symbol to lock the supported-arch set. That's fine for now, but it leaves an implicit contract: any future refactor that renames or hides the table will break the test in a way that looks like a test problem, not an API one. Either lift the table to a public PRICE_PER_GB_SEC (it's a documented pricing constant, not an implementation detail) or assert behaviorally via estimate_cost(1.0, arch=<arch>)["price_per_gb_sec"] for each expected arch. Behavioral form also catches an accidental price change, which the current set(_PRICE_PER_GB_SEC) == {"arm64", "x86_64"} check does not.
Generated by Claude Code
| raise_for_fd_exhaustion(e, max_workers) | ||
| last_error = str(e) | ||
| if any(sub in last_error for sub in _RETRYABLE_SUBSTRINGS): | ||
| time.sleep((2**attempt) + (time.time() % 1)) |
There was a problem hiding this comment.
🤖 from Claude (review)
Backoff sleeps on the final attempt. time.sleep((2**attempt) + (time.time() % 1)) runs regardless of whether another retry is coming. On attempt == max_retries - 1 (default: attempt 2), a retryable exception still triggers a 4–5 s sleep, then the loop exits and the failure dict is returned — the caller waits ~5 s for nothing. Pre-extraction had the exact same bug, so this is not a regression, but the extraction is a natural opportunity to fix it: if attempt < max_retries - 1: time.sleep(...). Mention it in the PR body if you'd rather defer (would also need a test update for test_retryable_client_exception_sleeps_and_retries to confirm no sleep on final attempt).
Generated by Claude Code
| # boto3 / botocore live in the lambda extra; importing locally keeps | ||
| # ``zagg.dispatch`` usable in test environments that haven't installed | ||
| # the AWS deps and mirrors the lazy-import pattern in | ||
| # ``zagg/__init__.py`` for ``runner``. |
There was a problem hiding this comment.
🤖 from Claude (review)
Reassigning max_workers makes the intent fragile. The parameter is shadowed by the clamped return from compute_available_workers and then fed into Config(max_pool_connections=max_workers). The current call ordering is correct (probe → clamp → size the dispatch client to the clamp), and test_dispatch_client_pool_sized_to_clamp pins it, but the reassignment-then-reuse pattern is exactly the kind of thing a future "just hoist the Config builder" refactor will get wrong by inlining the original max_workers. Rename the clamped value (clamped_workers, report = compute_available_workers(...)) and pass that explicitly — pure readability, no behavior change.
Generated by Claude Code
|
🤖 from Claude Phase 3 + fold landed ( The two Questions for review in the PR body are still standing:
Continuing next run unless either question lands first. Generated by Claude Code Generated by Claude Code |
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Fresh-context adversarial self-review of Phase 4 (52ff1c4, src/zagg/temporal.py). Inline comments above. Triage:
Folding into this phase (diff-scoped):
spatial_max/spatial_minmask inconsistency on non-binary masks — make both use.where(combined_mask > 0).spatial_max_gradientinfat the equator (1/sin(lat)) — guard + an equator-spanning regression test.- Missing-
cell_areasopaqueTypeError— raise a clear error naming the static field. - Test gaps — add a batching-invariance test on a non-idempotent reducer (
sum/weighted_mean) and direct coverage forspatial_max_gradientandspatial_min_level_then_weighted_mean.
Standing for @espg (semantics / config-contract, not folding):
- The
is_anomalybranch can double-apply with atransform: monthly_anomalyspec and hardcodes the transform name; collapsing it into the generictransformhook changes the temporal spec schema — wants sign-off before Phase 5/7 wire the example config (also under Questions for review). - Built-ins seed registries as an import-time side effect (per the June plan); a single idempotent
register()entry-point would decouple availability from import order if desired later.
Will push the folded fixes as a follow-up phase commit and re-review.
Generated by Claude Code
|
|
||
| def spatial_max(var_t, combined_mask, cell_areas): | ||
| """Max value under the masked footprint for one timestep.""" | ||
| vals = (var_t * combined_mask).values[combined_mask.values > 0] |
There was a problem hiding this comment.
🤖 from Claude (review)
spatial_max and spatial_min disagree on non-binary masks. spatial_max reduces over the product (var_t * combined_mask), while spatial_min (line 167) reduces over the unscaled var_t.where(combined_mask > 0). With a fractional mask cell of 0.5 over var=10, spatial_max returns 5.0 but spatial_min returns 10.0 (verified). This only bites if event masks are ever non-0/1 — the docstring says "Binary" but nothing enforces it. Suggest making spatial_max use the same .where(combined_mask > 0) selection as spatial_min so the two are consistent for any mask. Folding this in.
Generated by Claude Code
| ) | ||
| r = 6378 # Earth radius in km | ||
| lat_partials = rads.differentiate("lat") / r | ||
| lon_partials = rads.differentiate("lon") / (np.sin(rads.lat) * r) |
There was a problem hiding this comment.
🤖 from Claude (review)
spatial_max_gradient divides by sin(rads.lat), which is 0 at the equator → a divide-by-zero and inf that then propagates through np.nanmax (verified: a grid spanning lat=0 returns inf). This is a faithful port from artools, where Antarctic grids never hit lat=0, but it's registered as a general built-in. Folding a guard that suppresses the 1/sin(lat) blow-up at/near the equator, plus a regression test that spans the equator.
Generated by Claude Code
| static_sub[name] = arr.sel(lat=lats, lon=lons) | ||
| except (KeyError, ValueError): | ||
| static_sub[name] = arr | ||
| cell_areas = static_sub.get("cell_areas") |
There was a problem hiding this comment.
🤖 from Claude (review)
cell_areas = static_sub.get("cell_areas") is None when the static field is absent; the weighted spatial funcs then do var_t * combined_mask * cell_areas and crash with an opaque TypeError: ... 'float' and 'NoneType'. Folding an up-front check that raises a clear error naming the missing cell_areas static field when any spec uses a weighted/area func.
Generated by Claude Code
| continue | ||
|
|
||
| var_t = loaded[spec["collection"]][spec["variable"]].sel(time=t) | ||
| if spec.get("is_anomaly"): |
There was a problem hiding this comment.
🤖 from Claude (review)
The is_anomaly branch hardcodes get_field_transform("monthly_anomaly") and then separately applies spec["transform"] below — a spec with both anomaly: True and transform: monthly_anomaly would subtract the climatology twice, and hardcoding the name partly defeats the "resolve everything by name" design. Leaving this standing for @espg rather than folding: collapsing is_anomaly into the generic transform hook changes the temporal spec schema (anomaly: flag vs transform: monthly_anomaly), which is a config-contract decision that wants sign-off before Phase 5/7 wire the example config. Flagged under Questions for review.
Generated by Claude Code
| return float(self.value) | ||
|
|
||
|
|
||
| registry.register_reducer("max", MaxAccumulator) |
There was a problem hiding this comment.
🤖 from Claude (review)
Built-ins seed the global registries as an import-time side effect (register_*(..., replace=False)), so importlib.reload(zagg.temporal) — or a plugin re-registering the same name — trips the merged duplicate-name guard. This matches the June plan ("built-ins register themselves"), and the test_registry.py fixture now clears-at-setup so import order can't leak them, so I'm leaving the structure as-is for this phase. Noting it as the root cause behind that fixture change; if we later want capability availability decoupled from import order, the cleaner shape is a single idempotent register() entry-point (matching the plugin discovery contract). Standing for @espg.
Generated by Claude Code
…t, cell_areas guard (#12)
|
🤖 from Claude Status: reconciled onto Two items are standing for review (see Questions for review + the inline self-review note): the Generated by Claude Code |
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Fresh-context adversarial review of Phase 5 (commit 0d20e70, range 4ed4434..0d20e70): the PipelineStrategy seam in agg(). Verdict: the dispatch seam is sound and the byte-identical requirement holds. No blocking findings. A handful of low/medium notes below, plus one finding that I couldn't anchor inline (the file wasn't touched by this PR).
1. Byte-identical guarantee (the hard requirement): PASS. I diffed the old agg() body against SpatialStrategy.run() line-by-line — it is verbatim: same catalog/store resolution order and error strings, same grid_type/child_order, same _maybe_warn_dense, same driver/endpoint/region resolution (incl. the region == "us-west-2" override), same n_cells clamp, same local/lambda/unknown-backend branches and summary dicts. The monkeypatch seams existing tests rely on (runner._run_local, _run_lambda, _invoke_lambda_*, process_shard, open_store, consolidate_metadata) are untouched, and TestSummaryKeysByteIdentical still calls _run_local/_run_lambda directly so its pins remain valid. One new behavior at the seam: agg() now calls get_pipeline_type(config) first, which can raise ValueError for a malformed pipeline block that previously slid into the spatial path — strictly a better guard, and a no-op for valid spatial configs, so the guarantee is intact.
2. TemporalStrategy correctness: solid. Executor usage mirrors the spatial local path (tagged-envelope _event_work catching Exception, LocalExecutor + dispatch + LOCAL_RETRY + executor.shutdown() in finally). Error counting is correct and verified by test_failing_event_counted_as_error (2 errors, 0 with-data). UnknownCapability(KeyError) from the registry is an Exception, so a typo'd capability name is counted as a per-event error rather than crashing the run — consistent. The empty-events clamp min(max_workers, len(event_list)) if event_list else 1 is sane and actually more robust than the spatial path (which would hit ThreadPoolExecutor(max_workers=0) at n_cells==0). dry_run / max_cells / lambda-rejection paths all behave and are tested. All 16 new tests pass locally (uv run --extra test pytest).
3. _validate_temporal_config: correct, deliberately minimal. Checks aggregation + non-empty variables + the four keys specs_from_config indexes directly; correctly leaves mask optional and defers capability-name checks to the registry. Mild under-validation (no data_source check, unlike the spatial branch's three mandatory sections) — defensible since temporal reads only aggregation and gets collections at runtime; flagging only to confirm it's intentional.
4. Tests: good coverage, two gaps. The temporal path is genuinely exercised through agg() over a real LocalExecutor with synthetic xarray events (nice). Gaps: (a) events=[] (distinct from events=None) is untested — worth a one-liner to pin the zero-event summary and the else 1 clamp; (b) no test drives a spatial config through the unmocked agg() end-to-end to a real summary — the routing test mocks SpatialStrategy.run, so the agg -> strategy.run -> _run_local kwarg handoff is only covered in halves. Preventive, since the body is verbatim today.
Could-not-anchor-inline finding (Medium): CLI crashes on temporal dry-run. src/zagg/__main__.py wasn't changed by this PR so I can't pin it, but Phase 5 made it reachable: validate_config now accepts temporal/event configs, so python -m zagg --config temporal.yaml passes load_config. Non-dry-run then fails cleanly (requires events=), but --dry-run returns the temporal dry-run summary (total_events/n_specs) while __main__.py:90 prints results['total_cells'] → KeyError. Either guard the CLI ("temporal not supported via CLI yet") or key the dry-run print on which summary shape came back. For @espg to scope (Phase 6/7 may wire the CLI regardless).
The known is_anomaly double-subtract question was explicitly out of scope and I did not evaluate it. Nothing here blocks advancing the phase; all findings are improve-the-PR or confirm-intent.
Generated by Claude Code
| output_credentials=output_credentials, | ||
| output_endpoint_url=resolved_endpoint, | ||
| handoff=handoff, | ||
| max_workers = min(max_workers, len(event_list)) if event_list else 1 |
There was a problem hiding this comment.
🤖 from Claude (review)
Low — empty-events clamp is sane; one note on the asymmetry with the spatial path. min(max_workers, len(event_list)) if event_list else 1 correctly avoids ThreadPoolExecutor(max_workers=0) when events=[] (which the spatial path on line 217 does not guard — min(max_workers, n_cells) with n_cells==0 would raise ValueError: max_workers must be greater than 0). So the temporal path is actually the more robust of the two here — good. Two small things:
- The empty case is untested.
agg(_temporal_config(), events=[])should return a zero-event summary; worth a one-line test to pin it (cheap, and it documents thatevents=[]is distinct fromevents=None). - If a caller passes
max_workers=0explicitly with a non-empty list,min(0, n)→ 0 → pool blows up. Edge case, low priority, just noting theelse 1only guards the empty-list branch, not an explicit 0.
Generated by Claude Code
| ``events`` argument until the Phase-6/7 reader + catalog land. | ||
| """ | ||
|
|
||
| def run(self, config, *, catalog, store, backend, driver, max_cells, |
There was a problem hiding this comment.
🤖 from Claude (review)
Low — TemporalStrategy silently ignores several kwargs. run() accepts catalog, driver, morton_cell, overwrite, function_name, region, output_credentials, output_endpoint_url, handoff to match the shared strategy signature, but uses none of them. Two are user-facing footguns:
catalog=is accepted and silently dropped — a user porting a spatial invocation who passescatalog=to a temporal config gets no signal.max_cellstruncates events (line 281) butmorton_cellis ignored — inconsistent selection semantics within the same strategy.
Not byte-identity-relevant and arguably fine while the signature is shared, but a one-line note in the docstring ("temporal ignores catalog/morton_cell/driver/handoff/output_* — these are spatial-only") would save a confused bug report. Optional.
Generated by Claude Code
| summary = { | ||
| "total_events": len(event_list), | ||
| "events_with_data": report.cells_with_data, | ||
| "events_error": report.cells_error, | ||
| "timesteps_processed": report.total_obs, | ||
| "wall_time_s": wall_time, | ||
| "store_path": store_path, | ||
| "backend": "local", | ||
| "results": report.results, |
There was a problem hiding this comment.
🤖 from Claude (review)
Info — temporal results are returned in-memory only, never persisted. Unlike the spatial path (which opens the store, emits the template, writes per-cell, and consolidate_metadatas), TemporalStrategy computes report.results and hands them back in the summary dict but never touches store_path beyond echoing it. store_path = store or get_store_path(config) is computed (line 278) and reported, which reads like it's an output target. Assuming this is intentional for Phase 5 (the temporal writer lands later), consider a one-word note in the class docstring that no output is written yet, so the echoed store_path isn't mistaken for "results were written there." No action needed if a later phase wires the writer.
Generated by Claude Code
| def _validate_temporal_config(config: PipelineConfig) -> None: | ||
| """Validate a temporal/event pipeline config (issue #12, Phase 5). | ||
|
|
||
| Temporal pipelines carry no output grid; the only cross-check is that the | ||
| ``aggregation.variables`` block names, per variable, the four keys | ||
| :data:`_TEMPORAL_SPEC_KEYS` that :func:`zagg.temporal.specs_from_config` | ||
| requires (the rest are optional flags with defaults). Raises ``ValueError`` | ||
| on a missing section or key. | ||
| """ | ||
| if not config.aggregation: | ||
| raise ValueError("Missing required section: aggregation") | ||
| variables = config.aggregation.get("variables") | ||
| if not variables: | ||
| raise ValueError("temporal pipeline requires aggregation.variables") | ||
| for name, meta in variables.items(): | ||
| missing = [k for k in _TEMPORAL_SPEC_KEYS if k not in meta] | ||
| if missing: | ||
| raise ValueError( | ||
| f"temporal variable '{name}' is missing required key(s): " | ||
| f"{', '.join(missing)} (need {', '.join(_TEMPORAL_SPEC_KEYS)})" | ||
| ) |
There was a problem hiding this comment.
🤖 from Claude (review)
Low — _validate_temporal_config correctness is good; two scope notes.
Correct: it checks aggregation presence, aggregation.variables non-empty, and the four required per-variable keys that specs_from_config indexes with meta[...] (variable, collection, spatial_func, temporal_reducer). mask is rightly optional (defaulted to "ais" in specs_from_config), and deferring capability-name validation to the registry's UnknownCapability is a sound call (avoids guessing installed plugins at load time). The empty-section error strings match the spatial path's "Missing required section: ..." convention.
Under-validation worth a sanity check:
data_sourceis not validated at all for temporal.specs_from_configdoesn't read it, andcollectionsare supplied at runtime viaevents=, so this is defensible — but the spatial branch requiresdata_sourceas one of three mandatory sections, so a temporal config with a typo'd/absentdata_sourcevalidates clean. Probably fine; just confirming it's deliberate.if not variablesrejects an emptyvariables: {}with "requires aggregation.variables" — good — but it shares the message with the missing-key case only loosely. Minor, the messages are distinct enough.
Generated by Claude Code
| def test_agg_spatial_still_routes_through_spatial_path(self, monkeypatch, atl06_config): | ||
| # Byte-identical guard at the seam: a spatial config dispatches into the | ||
| # unchanged spatial path. We assert agg() delegates to SpatialStrategy | ||
| # (the summary itself is pinned by TestSummaryKeysByteIdentical). | ||
| from zagg import runner | ||
|
|
||
| called = {} | ||
|
|
||
| def fake_run(self, config, **kwargs): | ||
| called["cls"] = type(self).__name__ | ||
| return {"ok": True} | ||
|
|
||
| monkeypatch.setattr(runner.SpatialStrategy, "run", fake_run) | ||
| out = runner.agg(atl06_config, catalog="c.json", store="./out.zarr") | ||
| assert called["cls"] == "SpatialStrategy" | ||
| assert out == {"ok": True} |
There was a problem hiding this comment.
🤖 from Claude (review)
Low — the byte-identical guarantee is pinned per-link but not end-to-end through agg(). This test mocks SpatialStrategy.run (good for proving routing), and TestSummaryKeysByteIdentical pins the summary by calling runner._run_local/_run_lambda directly. So the new seam agg() -> SpatialStrategy.run() -> _run_local() is exercised in two halves but never end-to-end: nothing runs a spatial config through the real (unmocked) agg() to a real summary dict. The risk surfaced by the refactor is precisely the glue in SpatialStrategy.run (kwarg names/order, the region == "us-west-2" override, the n_cells clamp) — and that glue is only covered by the existing Test_run_local-style integration tests, not via agg(). Consider one test that drives a stubbed-grid spatial config through the unmocked agg() and asserts the full summary keys/counts, so a future signature drift in the agg -> strategy.run handoff can't pass silently. (The body is verbatim today, so this is preventive, not a current bug.)
Generated by Claude Code
|
new conflicts in main that need to be resolved. What's the status of this? can you describe at a high level what has been added in terms of models, functions, etc? |
…peline # Conflicts: # src/zagg/config.py # src/zagg/runner.py # tests/test_lambda_handler.py # tests/test_processing.py # tests/test_runner.py # tests/test_shardmap.py
|
🤖 from Claude Conflicts resolved — merged What conflicted and how it was reconciledSix files conflicted; each kept main's canonical spatial/production code with the temporal seam re-layered additively on top:
Lint / tests (run locally just now)
High-level map of what this PR adds vs main (your "models / functions" question)Net diff over main is Phases 1, 4, 5 (2 and 3 landed canonically via #73 registry and #76 dispatch; this branch consumes them):
Status: Phases 1/4/5 are landed. Remaining: Phase 6 ( Standing questions (non-blocking, still awaiting your call)
This PR continues next run (Phases 6–7), stays a DRAFT, label unchanged ( Generated by Claude Code |
this is fine, we don't need to split the files
I don't understand; what the issue / question here that needs answering? can you explain? |
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Phase 6 adversarial review — output writer abstraction. Verdict: solid and safe on the spatial-byte-identical front, but the diff ships a notable amount of dead/unused machinery and carries two real bugs (S3 path mangling, a fragile test reload) plus a divergence between the registry the package advertises and the suffix check the runner actually uses. Tests are green locally: 78 passed, 1 skipped (h5py absent) on tests/test_output.py tests/test_runner.py. Nothing here threatens TestSummaryKeysByteIdentical or the spatial path. Triage below.
Fold into this phase (diff-scoped)
(1) Path() mangles s3:// URIs — temporal output to S3 is silently corrupted. tabular.py:92 does path = Path(path), and runner.py passes the raw store_path through. str(Path("s3://bucket/x.parquet")) collapses to "s3:/bucket/x.parquet" (one slash). _is_tabular_path returns True for s3://….parquet (suffix matches), so a temporal run with an S3 store enters the tabular branch and then writes to a mangled local-looking path — pandas.to_parquet won't reach S3 and the reported output_path is wrong. Given output.store is routinely an S3 URI elsewhere in zagg, this is a live foot-gun. At minimum: detect a remote scheme and either route through fsspec or raise a clear "tabular S3 output not yet supported" rather than Path()-mangling it. (file: src/zagg/output/tabular.py:92, src/zagg/runner.py:402-405)
(2) The runner ignores config.output["format"] — the registry it advertises is dead code. The package's headline API is get_writer(output_format(config)) (see __init__.py:18, base.py:62-86), but runner.py never calls it. It branches purely on the path suffix via _is_tabular_path (runner.py:402, _TABULAR_SUFFIXES at :446). Consequences:
- A perfectly valid config
output: {format: parquet, store: ./out_dir}(no tabular suffix) writes nothing and reportsoutput_path: None. The user asked for parquet and got silence. output_format(),get_writer(),register_writer(),ZarrGridWriter, and the whole format→writer registry are exercised only bytest_output.py— they have zero production callers. That's speculative abstraction the conventions (§4 "no dead code, no speculative abstraction") flag. Either wire the runner toget_writer(output_format(config))so the registry is the single source of truth, or drop the registry and keep the suffix map — but shipping both, divergent, invites exactly the bug above. Recommend wiring the runner to the registry and deriving the format fromconfig.output["format"](falling back to suffix), so_TABULAR_SUFFIXESinrunner.pyand_EXT_FORMATintabular.pystop being two copies of the same table. (files:src/zagg/runner.py:402,446,src/zagg/output/base.py:62-86)
(3) test_register_writer_replace_overrides leaves the reloaded base module desynced. tests/test_output.py does importlib.reload(output_base) in the finally, then re-registers only zarr. After reload the reloaded module's _WRITERS contains only zarr — the decorator registrations in tabular.py/zarr_grid.py are not re-run (those modules aren't reloaded). The suite stays green only because zagg.output.get_writer was imported into __init__ bound to the old module object, whose _WRITERS still holds all five names; the reloaded module is now a dangling divergent copy. Confirmed directly: post-reload, output_base._WRITERS == {'zarr'} while zagg.output.get_writer('parquet') still resolves. This is order-dependent fragility — a future test that imports from zagg.output.base import get_writer after this test would see a one-entry registry. Replace the reload with a scoped save/restore of _WRITERS (snapshot the dict, mutate, restore in finally), matching the lighter pattern already used in test_register_writer_decorator_form. (file: tests/test_output.py:55-66)
(4) Empty/all-error temporal run writes a column-less file. to_frame([]) returns a 0-column DataFrame; to_parquet succeeds but the file has no event_key column, so a downstream read_parquet(...).set_index("event_key") raises KeyError. Not asserted anywhere — test_to_frame_empty_rows only checks .empty, never round-trips an empty write. Either guarantee the schema columns exist for an empty frame or pin the empty-write behavior with a test so it's a decision, not an accident. (file: src/zagg/output/tabular.py:39-53)
Smaller / standing for review
(5) HDF5 string handling drops NaN and mixed types. _write_hdf5 does values.astype(str) for object/U/S columns (tabular.py:130-134). A NaN in a string column becomes the literal "nan", and an object column holding mixed numeric+None becomes stringified — silently lossy versus the parquet/csv paths. Only relevant once h5py is wired, and the path is untested in CI (h5py not installed), so it's a latent gap rather than a live bug. Worth a comment or a guard.
(6) HDF5 path is untested in CI. test_hdf5_round_trip_or_clear_error importorskips h5py, which is absent in the test extra, so the happy path never runs in CI — only the absent-error path (test_hdf5_without_h5py_raises_actionable_error) does. The round-trip assertions are effectively dormant. Fine as a deliberate choice (h5py is optional), but flag: the only HDF5 write coverage is local-only. If HDF5 output matters, consider adding h5py to the test extra (needs sign-off per §4 since it's a new test dep).
(7) output.format is never validated in config. config.py validates output.grid thoroughly but has no format validation, so a typo (format: parqet) silently falls through to the suffix path / default. Low stakes given (2), but once the registry is wired, an unknown format should surface get_writer's nice "known formats: …" error at config-load time.
(8) Writer Protocol is loose to the point of not buying much. write(self, payload, **kwargs) with # pragma: no cover — ZarrGridWriter.write(carrier, store, *, grid, chunk_idx) and TabularWriter.write(rows, path, *, output_format, key) share only the method name, not a callable shape. isinstance(..., Writer) therefore asserts almost nothing (runtime Protocol checks method presence only). It reads more as documentation than a contract; fine, but don't lean on it as a type guarantee.
(9) Nit — minor. from zagg.output import TabularWriter is a lazy import inside TemporalStrategy.run (runner.py:405); the rest of runner.py imports at module top. The lazy form avoids importing pandas/pyarrow on the spatial path — defensible, but worth a one-line comment saying so, since it reads as inconsistent otherwise. Also: the commit is authored Claude <noreply@anthropic.com> — §3/§6 say not to claim authorship in commits; likely a routine git-config issue, not a code concern, noting for completeness.
Bottom line: the byte-identical guarantee holds — ZarrGridWriter forwards verbatim (asserted by the monkeypatch tests at test_output.py:113-167), the spatial runner path is untouched (it still calls write_dataframe_to_zarr directly; zagg.output is imported only lazily inside the temporal branch), and importing zagg.output pulls nothing new (pandas/pyarrow are already core deps; h5py stays gated). No silent dependency was added. The substantive asks are (1)–(4); (2) is the one I'd most want resolved before this lands, since the advertised registry and the runner's actual dispatch disagree. None of these block advancing to the next phase, but (1) and (3) are genuine correctness/fragility issues, not style.
Generated by Claude Code
|
🤖 from Claude Phase 6 landed — What's in it
Self-review fold (
|
|
🤖 from Claude Conflicts resolved. Merged current Three files conflicted (7 hunks); each kept
Green locally: One non-blocking flag: On merge order (the question raised here): recommend landing #111 first, then rebasing this PR on top. Rationale: #111 is feature-complete (all phases done, default-OFF and byte-identical when off, small surface) and only awaits answers to its 4 design questions plus merge; this PR is the larger, still-in-progress change (Phase 7 outstanding, plus an open dependency decision) touching Remaining before this is mergeable on its own merits: Phase 7 ( Generated by Claude Code |
Please continue with phase 7
I'm having trouble seeing why we'd add h5py ; the output isn't spatial, so it doesn't map to zarr xarray, and we already have parquet. My lean is option (3) , dropping HDF5 entirely and write temporal output on Parquet. Can you explain your lean on (1) ; where is the issue's "HDF5/DataFrame" output note your referencing?
Can you give me some details on what this "generic transform hook" would look like? What's the comparison between |
…9842978de74 # Conflicts: # tests/test_runner.py
|
🤖 from Claude Going with your lean — option (3), Parquet-only. HDF5 is dropped this run (Phase 7, commit Answering your two questions: (a) Where is the "HDF5/DataFrame" output note? It's in issue #12's "Common Design Elements" comparison table, the last (
So "HDF5/DataFrame" describes what the AR repo currently emits, not a requirement that zagg adopt HDF5. My earlier lean toward (1) read that row as a soft spec to match; on reflection your point is the right one — that row is descriptive, and the two reasons to drop HDF5 are exactly the ones you gave: temporal output is flat tabular, so it doesn't map onto the Zarr/xarray model the gridded path uses, and Parquet is already core. (Worth noting (b) The generic Today the spec carries both, applied independently in var_t = loaded[spec["collection"]][spec["variable"]].sel(time=t)
if spec.get("is_anomaly"): # from `anomaly: true`
var_t = plugins.get_field_transform("monthly_anomaly")(var_t, static_sub, spec)
if spec.get("transform"): # the generic hook
var_t = plugins.get_field_transform(spec["transform"])(var_t, static_sub, spec)
# convenience flag — only ever means monthly_anomaly
anom_tqv:
variable: TQV
spatial_func: weighted_mean
temporal_reducer: weighted_mean
anomaly: true
# generic hook — names any registered field transform
anom_tqv:
variable: TQV
spatial_func: weighted_mean
temporal_reducer: weighted_mean
transform: monthly_anomaly # or detrend, log_scale, … once registeredThe double-apply: a spec that sets both
My lean is (3) — keeps the ergonomic flag, removes the footgun, no migration. Happy to land whichever you pick as a small follow-on; it's out of Phase 7's scope (Phase 7 is the example config + Parquet output) so I've left it standing rather than change spec semantics unprompted. One scope note for Phase 7: I landed the core (the Generated by Claude Code Generated by Claude Code |
espg
left a comment
There was a problem hiding this comment.
🤖 from Claude (review)
Fresh-context adversarial review of the Phase 7 diff (a8b6e2a, Parquet-only). One finding, now folded.
Folded (eca09a7): TestMerra2StormTemplate.test_specs_resolve_registered_capabilities checked spatial_func and mask against the registry but not temporal_reducer — the example uses max/weighted_mean/sum, all registered, so the assertion was a free strengthening of "every capability name resolves." Added assert spec["temporal_reducer"] in registry.list_reducers().
Checked clean:
- No dangling HDF5 references. Grepped
src/+tests/forhdf5|h5py|he5|HDF5|.h5|key=; every remaining hit is unrelated source-data HDF5 reading (h5coro, granule.h5URLs,processing/reads) or the intentional docstring intabular.pyexplaining why HDF5 was dropped. Thehdf5alias,key=param,_write_hdf5, the.h5/.hdf5/.he5_EXT_FORMAT+_TABULAR_SUFFIXESentries, and theModuleNotFoundErrorpath are all gone; error text + docstrings now say "parquet or csv". merra2_storm.yamlinternally consistent.load_config→validate_configpasses;get_pipeline_type==temporal;specs_from_configresolves all three vars; every capability name (max/weighted_mean/weighted_sum,max/weighted_mean/sum,ais/full/ocean) exists in the live registry;anomaly: true→is_anomaly=True;output.format: parquet.- Coverage loss acceptable — the two removed tests covered only the deleted HDF5 path; no other test references it.
- No dependency change (removes the optional
h5pypath, adds nothing). Spatial path untouched — runner change is limited to_TABULAR_SUFFIXES+ one comment; no module crosses the ~1000-line concern from this diff. Terseness/style match the surrounding configs and tests; no dead code.
No blocking or non-blocking findings beyond the folded nit.
Generated by Claude Code
Generated by Claude Code
yes, option (3) approved |
In scope here , add them as phase 7b please |
Refs #12. Per @espg's direction in #12 (comment) ("Do it, fresh branch of main / fresh PR.") — this PR is a fresh start off
main, replacing the stalemagg_temporal/zagg-temporalbranches that diverged before #28–#41 landed.Implements the temporal pipeline infrastructure (issue #12) in additive phases. Phase 1 lands the smallest independently mergeable seam: a
pipeline.typefield onPipelineConfigplus aget_pipeline_type(config)helper that the dispatch strategy reads. Subsequent phases layer the worker, dispatch strategy, and output writers on top without changing the spatial production path until the strategy seam itself flips.Reconcile onto merged infra (#73 registry, #76 dispatch)
Per @espg's rebase-not-restart decision (#64 (comment)), the two pieces this PR originally drafted as its own Phases 2 and 3 have landed canonically on
mainvia dedicated PRs, and this branch reconciles onto them:src/zagg/registry.pyonmain.src/zagg/dispatch.pyonmain.Phases
get_pipeline_type(commitb9adafa).pipeline.typedefaults to"spatial"; every existing YAML stays valid byte-for-byte.src/zagg/temporal.pyprimitives +process_event(commit52ff1c4, fold4ed4434).PipelineStrategydispatch inrunner.agg()(commit0d20e70).SpatialStrategy(byte-identical) +TemporalStrategy.src/zagg/output/writer abstraction (commitc7a149d).ZarrGridWriterkeeps the current spatial path byte-identical;TabularWriterfor Parquet/CSV temporal output.merra2_storm.yamlexample + Parquet-only temporal output (commita8b6e2a). HDF5 dropped per the dependency decision below; Lambdaprocess_eventmode + pin changes deferred (see Questions for review).Phase 7 — what / approach (Parquet-only decision)
@espg leaned option (3) — drop HDF5 entirely, standardise temporal output on Parquet (#70 (comment): "I'm having trouble seeing why we'd add h5py … we already have parquet"). That removes the open dependency blocker — Phase 7 adds no new dependency; Parquet/CSV ride
pyarrow/pandas, already core.Landed this run:
output/tabular.py— the_write_hdf5method, the optionalh5pyimport + actionable-error branch, thehdf5registry alias, thekey=param, and the.h5/.hdf5/.he5entries in_EXT_FORMAT(and the matching_TABULAR_SUFFIXESentries inrunner.py).TabularWriternow serialises Parquet (default) or CSV only; docstrings inoutput/__init__.py/base.py/tabular.pyupdated to reflect the Parquet standardisation and cite the issue Temporal aggregation #12 "HDF5/DataFrame (tabular)" row directly. The two HDF5 tests (round-trip +h5py-absent error) are removed and thehdf5parametrize entry dropped — there is no longer a skipped test in the suite.src/zagg/configs/merra2_storm.yaml— the packaged temporal-pipeline example from issue Temporal aggregation #12:pipeline.type: temporal, pluralcollections,static_data(ais_mask + climatology), three aggregation variables exercisingmax/weighted_mean/weighted_sumspatial funcs, theais/full/oceanmasks, andanomaly: true— every capability name resolves throughzagg.registry.output.format: parquet.TestMerra2StormTemplateintests/test_config.pypins that the packaged example loads, is atemporalpipeline, validates, selects theparquetoutput format, and that every spec resolves a registered spatial-func/mask. The existingtests/test_runner.py::TestTemporalStrategyParquet wiring tests (a.parquetstore writes + reportsoutput_path; a directory store stays in-memory;s3://rejected) continue to pin the runner write path.Deferred to an @espg decision (see Questions for review): the Lambda handler
process_eventmode and the "loosened runtime pins". The Lambda mode is not a clean additive seam yet — it depends on a temporal reader (loadcollections/static_datafrom S3) that is not implemented, and on the S3 Parquet write path that_write_tabular_outputcurrently rejects pending exactly this handler. Building both now would be net-new infra rather than a mirror of the localTemporalStrategy. No pins were touched: dropping HDF5 means there is no new dep to add, and no existing temporal pin needs loosening for Parquet-only output.Phase 6 — what / approach
New package
src/zagg/output/gives both pipeline cores a single polymorphicWriterseam (issue #12 challenge #5: "the output layer needs to be polymorphic").output/base.py— aWriterProtocol, a localoutput.format → writerregistry (register_writer/get_writer, same semantics aszagg.registry: non-empty-name + duplicate-raises-unless-replace), andoutput_format(config)(defaults to"zarr", so every existing spatial config selects the gridded path unchanged).output/zarr_grid.py—ZarrGridWriter— a thin adapter over the functions the spatial runner already calls:write→write_dataframe_to_zarr,write_ragged→write_ragged_to_zarr,finalize→consolidate_metadata(..., zarr_format=3). Every method forwards its arguments verbatim, so a store routed through the writer is byte-for-byte identical. The spatialrunnerpath is deliberately left calling the underlying functions directly.TestSummaryKeysByteIdenticalstays green, andtests/test_output.py::TestZarrGridWriterpins the verbatim forwarding.output/tabular.py—TabularWriter— flattens the temporalsummary["results"]rows ({event_key, results, meta}) into oneDataFramerow per event (missing per-event outputs align asNaN;timesteps_processedcarried through), then serialises to Parquet (default) or CSV by file suffix or an explicitoutput_format=. Both usepyarrow/pandas, already core deps — no new dependency.TemporalStrategy.runpersists its event rows throughget_writer(output_format(config))whenoutput.storenames a tabular file (.parquet/.pq/.csv), reporting the written path assummary["output_path"]. A bare-directory store writes nothing and reportsoutput_path: None. Ans3://tabular store is rejected with a clear error (the Lambda write path is deferred). The spatial path is untouched.How tested
pytest tests/test_output.py tests/test_runner.py tests/test_config.py tests/test_temporal.py -q→ 323 passed, 0 skipped (the prior HDF5 skip is gone with the path removed).TestSummaryKeysByteIdentical,TestStrategyDispatch,TestTemporalStrategy, the newTestMerra2StormTemplateall green.ruff check --select=E,F,W,I --ignore=E501 src testsclean; touched files areruff format-clean. (tests/test_runner.pystill shows pre-existingruff formatdrift insidemain's_run_lambda_with_durationsprofiling block, lines 1000+ — untouched per the "don't fix unrelated CI failures" rule; it is not in the temporal seam.)Questions for review
process_eventmode + loosened pins (Phase 7, deferred). Wiring a Lambda temporal worker needs two pieces that don't exist yet: a temporal reader to loadcollections/static_datafrom S3, and the S3 Parquet write path (_write_tabular_outputrejectss3://today). Both are net-new infra, not a mirror of the localTemporalStrategy, so I deferred them rather than guess at the seam. Want these as a follow-on phase/PR, or in-scope here? (No runtime pins were changed — Parquet-only adds no dep, and nothing temporal needs a pin loosened.)is_anomalydouble-apply / schema (Phase 4, your Temporal aggregation infrastructure (Refs #12) #70 (comment)) — answered below in a PR comment with the concretetransform-hook-vs-anomaly:truecomparison.Authored by Claude for @espg under the #12 routine.
Generated by Claude Code