From 3ea2977d38b52e794a8271b8955c8b249de2413d Mon Sep 17 00:00:00 2001 From: Keeeeeeeks Date: Thu, 25 Jun 2026 11:48:40 -0400 Subject: [PATCH] Add pipeline-spec: v5 cross-match dataset methodology + script spec Specification for building a cluster-scale, cross-matched multimodal dataset (stars + galaxies + AGN; images + spectra + light curves + tabular) to extend the Platonic Universe program, by extending and hardening the existing v4/OmniSky pipeline. Covers: global HEALPix-order-29 object identity; a multi-submission consistency model for shared-filesystem clusters (disjoint partitions, atomic writes, optional claim ledger, global per-service rate limits, single HF uploader); lsdb/HATS access over HuggingFace for MultimodalUniverse sources; a motion-aware false-match protocol that accounts for real (proper-motion) and apparent (parallax) displacement; a CPU/GPU/ network resource model; integrity-checked DONE-marker resume with a release audit; and a per-source data access matrix. Drafts for review. --- pipeline-spec/01-methodology.md | 350 ++++++++++++++++++ pipeline-spec/02-script-spec.md | 193 ++++++++++ pipeline-spec/03-v4-walkthrough.md | 236 ++++++++++++ .../04-downstream-analysis-interface.md | 78 ++++ pipeline-spec/README.md | 37 ++ 5 files changed, 894 insertions(+) create mode 100644 pipeline-spec/01-methodology.md create mode 100644 pipeline-spec/02-script-spec.md create mode 100644 pipeline-spec/03-v4-walkthrough.md create mode 100644 pipeline-spec/04-downstream-analysis-interface.md create mode 100644 pipeline-spec/README.md diff --git a/pipeline-spec/01-methodology.md b/pipeline-spec/01-methodology.md new file mode 100644 index 0000000..691c0d6 --- /dev/null +++ b/pipeline-spec/01-methodology.md @@ -0,0 +1,350 @@ +# Methodology: A Cluster-Scale Cross-Matched Multimodal Dataset for the Platonic Universe + +> **Status:** draft for PI review (v5). Companion docs: +> [`02-script-spec.md`](02-script-spec.md) (the scripts), +> [`03-v4-walkthrough.md`](03-v4-walkthrough.md) (how the existing code works), +> [`04-downstream-analysis-interface.md`](04-downstream-analysis-interface.md) (what +> the dataset must support for the PRH analysis). +> +> **v5 changes (this revision)** address PI feedback: a real *false-match protocol* +> that accounts for real+apparent motion (§3.4.1); a *distributed execution & +> consistency model* for concurrent multi-user jobs on a shared filesystem (§3.2); +> a *resource & throughput model* that reasons about CPU/GPU/network (§3.8); +> *hardened DONE markers* with integrity verification (§3.2); and a *Data Access +> Matrix* answering "stream from afar vs. S3/HF" per source (Appendix A). + +## 1. Goal & scientific framing +Build the largest possible **clean, cross-matched, multimodal** catalog of celestial +objects (stars, galaxies, AGN), where **each retained object is observed by ≥2 +instruments** and carries, where available, **images + spectra + light curves + +tabular** features. Upload to HuggingFace. The dataset extends the *Platonic Universe* +program (arXiv:2509.19453, Duraphe, Smith, Sourav, Wu — "Do Foundation Models See the +Same Sky?"), which tests whether models trained on different astronomical +modalities/instruments converge to a shared representation (Platonic Representation +Hypothesis, Huh et al. 2024, arXiv:2405.07987). + +**Why this dataset shape is the right one for the science** (from the downstream +analysis, see doc 04): the convergence metrics (**mutual k-NN alignment** and **CKA**) +require the **same N physical objects encoded by both models**. Precedent: AstroCLIP +(arXiv:2310.03024) aligned image↔spectrum on **197,632** galaxies matched by shared +TARGETID; the PRH paper got stable trends with **N≈1,000–1,024 paired objects** at +k=10. **Scale target (PI decision): maximize raw object count** — match or beat v4's +1.58M-object build — while every design choice below still optimizes for +**trustworthy pairs** (match *quality* gates inclusion; volume is the objective). + +## 2. Starting point: extend & harden `v4/OmniSky`, don't rebuild +The `layerwise-analysis/v4` pipeline ("OmniSky") already produced a 1.58M-object, +12-survey dataset and is the right backbone (see doc 03). The science logic +(seed-and-match, HEALPix pre-filter, epoch propagation, streaming shards, the +shifted-catalog false-match test) is **sound and worth keeping**. What is "scratch" +and must be re-architected: **cluster concurrency across concurrent submissions, +resumability, global object identity, the (missing) HF upload, array storage/typing, +the false-match test (which today tests a *different matcher* than production), and +remote data access (today raw HTTP; v5 uses lsdb/HATS over HF + S3).** This document +specifies the hardened design; doc 02 specifies the scripts. + +## 3. Core architectural decisions + +### 3.1 Identity: one `global_object_id`, assigned once +- **Canonical frame/epoch:** ICRS, **J2016.0** (Gaia DR3 reference). +- **ID:** **nested HEALPix index at order 29 (`nside = 2^29`)** of the seed position, + packed as signed `int64` → `global_object_id`. Deterministic, idempotent, compact, + fast integer joins; sub-milliarcsec cell size makes collisions negligible (assert + uniqueness at build time and **fail the build** on any collision). **This is not an + arbitrary choice:** MMU v1 catalogs already carry a native `_healpix_29` column + (order-29 NESTED), so our global ID is continuous with the ecosystem we draw from. +- **Assignment rule (critical):** compute the ID **once, from the seed catalog**, at + J2016.0 (stars propagated there via `apply_space_motion`; galaxies/AGN at fixed + ICRS). **Never recompute it from a downstream survey's coordinates** — two surveys + report slightly different positions for the same object (and `_healpix_29` will + therefore differ between catalogs), so recomputing would split identity. All source + matches attach to the seed's ID. +- **Provenance:** keep every survey-native ID as its own column (`seed_native_id`, + `gaia_source_id`, `apogee_id`, `sdss_specobjid`, …). Global ID = join key; + native IDs = provenance. Persist `seed_ra_deg_j2016`, `seed_dec_deg_j2016`, + `seed_epoch_jyear`. + +### 3.2 Distributed execution & consistency model (multi-submission, shared FS) +**The PI's question:** "I submit a job, you submit a job — do they run in parallel and +write to the same database safely?" **Operating assumption (confirmed):** one cluster +(NCSA DeltaAI), **multiple independent SLURM submissions by multiple users sharing one +Lustre filesystem**, writing to one release tree. We design for that explicitly. + +**Substrate guarantees we rely on (and ones we don't).** On a single Lustre filesystem, +`os.replace()` (rename) is atomic and `open(O_CREAT|O_EXCL)` is atomic for exclusive +creation — we build on these. We **do not** rely on `flock`/`fcntl` locks (Lustre +needs the `-o flock` mount option, it is advisory, and it is fragile across nodes). + +**Ownership model — primary: disjoint partitioning by construction (no runtime locks).** +A single `plan_work_units.py` builds the authoritative manifest *once*. Each submission +is launched with `--partition-id k --num-partitions K`; a job processes only manifest +rows where `row_index % K == k` (or a contiguous block range). Two submissions with +different `k` **never target the same output path** ⇒ zero coordination at runtime, +embarrassingly parallel, and trivially correct even if more people join later. This is +the recommended default for v1. + +**Ownership model — optional: a claim ledger (for dynamic load-balancing).** If static +partitions become load-imbalanced, switch to atomic claiming: to process row *i*, a +worker does `os.open(claims//.claim, O_CREAT|O_EXCL)`; success = +ownership (write `{jobid, array_task, host, pid, start_ts, heartbeat_ts}`), failure +(`FileExistsError`) = already owned, skip. A periodic heartbeat refreshes `heartbeat_ts`. +A reaper reclaims a claim only if its SLURM job is absent from `squeue` **and** the +heartbeat is older than a TTL (e.g. 2× interval), by deleting the stale claim. Use this +**only if** disjoint partitioning proves too rigid — it adds a ledger to reason about. + +**Atomic, verified writes (every work unit).** Write +`shard.parquet.tmp...` → `fsync(file)` → `os.replace()` to the +deterministic final path → `fsync(dir)` → **then** write the DONE marker atomically +(tmp → fsync → `os.replace` → `fsync(dir)`), **strictly ordered after** the data is +durable. A crash between the two leaves "final, no marker" = a *suspicious* state that +re-runs safely (idempotent overwrite). + +**DONE markers prove the bytes are correct and current** (the PI's "barebones" fix). A +marker (`shard.done.json`) records: manifest hash, seed version, source release, schema +version, **code commit (git SHA)**, **row count**, **output byte size**, and an +**output content checksum** (xxh3/sha256 of the file, or of a canonical per-column +digest), plus host/jobid/timestamp. Resume is a **state machine**, not "directory +exists": +| State | Detection | Action | +|---|---|---| +| pending | no final, no marker | run | +| interrupted | tmp only (no final) | GC tmp if owner dead; run | +| suspicious | final, no marker | re-run (overwrite) | +| stale | final+marker, but marker hash ≠ current manifest/schema/code | re-run | +| corrupt | final+marker, hashes match, but file checksum ≠ marker | re-run | +| complete | final+marker, hashes & checksum match | **skip (no-op)** | + +A release-level audit (`verify_markers.py`, gates upload) independently re-derives that +every manifest row is `complete`, with no orphan tmp files and no suspicious/stale/ +corrupt states. **This is the "is *done* actually done?" test** — and it doubles as the +shared completion ledger across submissions. + +**Global per-service rate limiting (the consequence of multi-submission + max-volume).** +Remote services (PS1, Legacy cutouts, CDS XMatch, GALAH, IRSA) limit the **sum** of +requests across *all* concurrent jobs and *all* users — per-job caps are insufficient, +and two users hammering CDS/PS1 at once risks throttling or an IP ban. Two options: +- **v1 (recommended): partition *sources* across submitters** (you own cutouts, PI owns + spectra) so no two submitters hit the same service simultaneously — no shared state. +- **Upgrade: a shared token-bucket per service**, persisted on Lustre (atomic update via + `O_EXCL` temp + rename; avoid sqlite-on-Lustre locking). Each worker acquires a token + before a remote call. Rates are seeded from `probe_sources.py` and published limits. + +**HF write model (the other "database").** Concurrent `upload_folder`/`create_commit` +to one HF repo cause git/LFS commit races (HTTP 412, non-fast-forward). Rule: a **single +designated uploader job** runs `upload_hf.py` *after* all builds finish and the audit +passes; it is never run by two submitters at once. (If incremental upload is ever +needed, each writer commits **disjoint path-prefixes** via `create_commit` with +backoff on 412 — but v1 uses the single-uploader model.) + +**RAM/memory budget (the literal "memory model").** Each array task owns one seed +row-shard (≤ `--shard-size`, default 50k objects) × one source; peak working set = +the shard's positions + one source partition + matched payloads, bounded to a few GB. +`finalize_shard.py` joins **per shard** (no global shuffle) in 50k slabs. Narrow dtypes +(`float32` payloads, `int64`/`float64` for ids/coords), Arrow nested types (never pandas +object columns). A GH200 node's ~480 GB unified memory dwarfs a single task, so we pack +**many array tasks per node**. + +### 3.3 HEALPix + lsdb/HATS as the access & match layer (PI decision: adopt lsdb) +**For MMU sources, read and match with lsdb/HATS, not raw HTTP.** MMU v1 publishes full +catalogs as HATS under `hf://datasets/UniverseTBD/mmu_{catalog}_{subcatalog}`, +partitioned at HEALPix **order-4 / nside-16 (NESTED)** with finer data-dependent leaves. +Access pattern (validated by the `TobiasPitters/mmu-crossmatch` Space): +```python +import lsdb +cat = lsdb.open_catalog( + "hf://datasets/UniverseTBD/mmu_gaia_gaia", + search_filter=lsdb.PixelSearch([(order, pix)]), # partition pushdown + columns=["ra", "dec", ...], # column pushdown +) +``` +This reads **only the needed partitions and columns** from HF's S3/CDN — the modern +replacement for v4's HTTP reads off `users.flatironinstitute.org`. Two critical notes: +- **lsdb crossmatch is positional only** (default `n_neighbors=1`, `radius_arcsec`). So + **we own epoch propagation**: propagate the seed to each source's epoch *before* + handing coordinates to lsdb (§3.4), and set **`n_neighbors>1`** to surface ambiguity + (the "all-within-radius" behavior of §3.4), then adjudicate ourselves. +- **Margin cache fixes boundary loss:** load each cell **plus a 1–2′ margin** (lsdb's + margin cache, or neighbor cells via `astropy-healpix`) so an object near a partition + edge isn't dropped when its counterpart sits in the adjacent cell. This is exactly the + bug in MMU's naive per-cell `cross_match_datasets` and the demo's "do not overlap" + sub-pixel case. + +**Fail fast, never silently degrade.** v4's `compute_catalog_healpix` returns `None` +(→ full all-sky scan) if `healpy` is missing. v5 makes HEALPix/lsdb a **hard dependency** +and aborts if unavailable. Standardize: `lsdb`/`hats` for partitioned catalog access, +`astropy-healpix` for ad-hoc indexing/neighbors, `healpy` only for map utilities. +Non-MMU sources (ZTF on S3, cutout/VO services) keep purpose-built readers but reuse the +same pre-filter + column-projection discipline (doc 03 §3). + +### 3.4 Matching: `apply_space_motion` + radius search + explicit adjudication +- **Proper motion:** replace the hand-rolled propagation with astropy + `SkyCoord.apply_space_motion()` (rigorous spherical motion). Document edge cases: + missing PM/RV, extragalactic `pm=0` *by assumption* (flag it), sign of Δt, the + `cos(δ)` convention, near-pole instability, and long-time-span surveys vs a single + mean epoch (use a catalog reference epoch, or per-detection times, where available). +- **Candidate retrieval:** use **`search_around_sky`** (all within radius) for custom + readers, or **lsdb `crossmatch(..., n_neighbors>1)`** for MMU sources — **not** + nearest-only — so ambiguity is visible. Then adjudicate: within source-specific radius + → prefer unambiguous → prefer best `sep / positional_uncertainty` → use quality flags → + smallest separation only as final tie-break. Emit `match_ambiguous: bool` and + `n_candidates_within_radius: int`. +- **Per-source radii**, not one global 3″: tight for precise catalogs + (Gaia/SDSS/DESI/Legacy/PS1), looser only where justified (GALEX, 2MASS, unWISE). + +#### 3.4.1 Proving matches: the hardened false-match protocol (the PI's #1 point) +**Why v4's test is not enough.** `false_match_test.py` shifts the seed **+30″ in RA +only** and matches with **nearest-only on *un-propagated* positions** — i.e. it +benchmarks a *different, simpler matcher than production*, and a fixed 30″ is comparable +to the multi-decade proper-motion displacement of high-PM stars. So it (a) fails to +exercise the real failure mode, and (b) for a high-PM star, a 30″ shift can land on that +star's *true* counterpart at a different epoch, contaminating the "false" estimate. 30″ +is **not** "clearly beyond any real displacement" once you admit proper motion and +parallax. The reported 0.02%/0.06% numbers therefore don't characterize the real pipeline. + +**The v5 protocol (account for real *and* apparent motion):** +1. **Mirror production exactly** — same `apply_space_motion` to the source epoch, same + radius search / lsdb match, same adjudication. The FMR must be the FMR of the *actual* + matcher. +2. **Monte-Carlo random-direction offsets** — apply N≈100 offsets at random position + angles (not one fixed RA shift); report the *distribution* (mean / 95th / worst), not + a point estimate. +3. **Offset scaled to the physical confusion scale** — draw offset magnitudes from an + annulus with inner radius `r_min ≳ match_radius ⊕ (μ_max · Δt_baseline) ⊕ ϖ_max ⊕ + σ_astrometry`: combine the match radius, **real motion** (proper motion × the + survey's actual epoch baseline), **apparent motion** (parallax amplitude ϖ), and + astrometric error. For nearby/high-PM stars this is tens of arcsec, so `r_min` is set + per population/PM-bin — not a blanket 30″. +4. **PM-direction-scramble null (the most severe, most physical test)** — keep each + star's PM *magnitude*, randomize its *direction*, re-propagate, re-match. This + directly measures "if my epoch correction points the wrong way, how often do I grab a + false neighbor?" — exactly the PI's worry. +5. **Stratify** by **PM-magnitude bin × local source density × |b| (galactic latitude)** — + the high-PM tail and crowded low-|b| fields are where matches break; an aggregate + number hides them. +6. **Analytic cross-check** — fit the match-separation histogram as real (Rayleigh core + from astrometric error) + background (rising ∝ r from field density; Sutherland & + Saunders 1992 / Budavári & Szalay 2008); integrate the background within the radius. + Two independent estimators agreeing ⇒ trustworthy. +7. **Gate the upload** on a per source × population × PM-bin × |b|-bin threshold. + +### 3.5 Storage: raw values, typed schema, heavy payloads as shards +- **Store raw** pixel/flux/time values. Normalization is a documented **train-time** + transform per modality (§4), never baked in. +- **Explicit versioned `pyarrow.Schema`** (don't let pandas infer): + `global_object_id:int64`, seed coords/epoch, per-source presence flags, native IDs, + per-source `match_sep_arcsec` + ambiguity, modality/instrument counts, split, + provenance/version. +- **Fixed-length arrays → Arrow `fixed_size_list`** (64×64→4096-flat; + APOGEE 7514). **Variable/heavy payloads** (images, variable spectra, light curves) → + **WebDataset tar shards** referenced by key, OR Arrow `list>` if inline. + Avoid pandas object-list columns. `float32` payloads/separations; `float64` only for + coordinates/times. + +### 3.6 "≥2 instruments" is an explicit, enforced rule +After the per-shard merge, count **distinct instruments/surveys with valid raw +payloads**; the seed row alone does **not** count, and metadata-only attachments do +**not** count as an instrument. Persist `n_instruments_present` and an +`instrument_presence_mask`. Drop (or route to a "singles" split) objects with <2. + +### 3.7 Upload is a first-class, verified phase +A dedicated `upload_hf.py` uses `HfApi.create_repo` + **`upload_folder`** for the release +tree, writes the dataset card, then **verifies** with a clean +`load_dataset(..., streaming=True)` / `hf_hub_download` round-trip. Run by a **single +designated uploader** after the release audit passes (§3.2). If single-repo size hurts, +split into a manifest repo + per-modality shard repos. (v4 only ever called +`create_repo` — the data was **never actually uploaded**; this phase fixes that.) + +### 3.8 Resource & throughput model (the PI's #3 point: CPU vs GPU) +**Cross-matching is network- and CPU-bound; GPUs do not help the bottleneck.** The +angular match itself is trivial — astropy's (and lsdb's) k-d tree matches 1.5M × millions +of sources in seconds–minutes. Wall-clock is dominated by **fetching data** and the +**latency/rate limits of per-object services**. Therefore: +- **Run data-gen on CPU resources, not GPU.** v4's SLURM script requests + `--gpus=1 --partition=ghx4` (a GH200) for a job that never touches the GPU — it sits + idle for ~2 days. On DeltaAI (GH200 / **ARM aarch64**), prefer a CPU partition; if + confined to GH200 nodes, leave the Hopper GPU unallocated and pack many array tasks + onto the 72-core Grace CPU to overlap network waits. **Reserve GPUs for the downstream + PRH side-car** (doc 04): embedding extraction, CKA's O(N²) Gram matrices, mutual-kNN + (FAISS-GPU genuinely shines there). +- **Did we look for GPU cross-match? Yes — it exists but won't help us.** GPU spatial NN + is real (FAISS-GPU, cuML `NearestNeighbors`, RAPIDS cuSpatial; map RA/Dec→unit vectors, + Euclidean NN). By **Amdahl's law** it accelerates <1% of our wall-clock; the ~99% is + network. lsdb's parallelism is **Dask (CPU)**, which is the right model here. The honest + line for the paper: *"GPU xmatch exists; it doesn't move a network-bound workload."* +- **aarch64 caveat:** verify `healpy`/`pyarrow`/`lsdb`/`hats`/`astropy-healpix` wheels on + ARM (most via conda-forge; flag any needing source builds). +- **Throughput budget (sizing).** Long poles: per-object cutouts (PS1, Legacy), CDS + XMatch (France; star-seed Gaia verification), GALAH (Australia). Estimate per service + `wall ≈ N / effective_req_per_sec`, `effective = min(service_limit, concurrency / + latency)`, capped **globally** across submitters (§3.2). S3/HATS sources (ZTF, MMU-on-HF) + are bandwidth-bound (tens of MB/s) and parallelize freely. `probe_sources.py` measures + these **before** we commit cluster hours (Appendix A). + +## 4. Normalization recipes (documented, applied at train time) +- **Images (HDR):** raw counts/flux stored. Recommended: per-band `asinh` stretch or + `astropy.visualization` `ZScaleInterval`; never naive 0–1 min–max (it crushes faint + structure against bright cores — the HDR failure flagged). Example code in the card. +- **Spectra:** raw flux + inverse-variance/mask stored; continuum-normalization optional + downstream (APOGEE is already continuum-normalized; document the inconsistency). +- **Light curves:** raw times/flux(mag)/errors; standardize per object/band downstream; + document time systems (ZTF HMJD vs TESS BTJD). + +## 5. Correctness & bias checklist (the "don't ruin the experiment" list) +1. **Epoch propagation present and correct** for every source (per-survey epoch table; + `apply_space_motion`). High-PM stars are the main hazard; galaxies pm=0. +2. **False-match rate via the §3.4.1 protocol** — production-mirroring, MC random + directions, PM-scramble null, stratified by PM × density × |b|, gated at upload. +3. **Ambiguity surfaced** (`match_ambiguous`, `n_candidates_within_radius`). +4. **Missingness is typed:** *outside footprint* vs *queried, no detection* vs *failed QC*. +5. **Selection biases inherited** from parent surveys (APOGEE → bright giants; PROVABGS → + DESI footprint, z<0.6; DR16Q → spectroscopic). Document them. +6. **Spatial (HEALPix) train/val/test split**, never per-object random. +7. **Uniqueness of `global_object_id` asserted**; **≥2-instrument rule enforced**. +8. **Determinism:** fixed seeds, pinned survey releases, manifest/schema/code hashes + + output checksums in every DONE marker. + +## 6. What "done" looks like +A versioned release on HuggingFace: a Parquet **manifest** + (WebDataset) payload shards, +an explicit schema, a dataset card with normalization recipes and per-source coverage/ +false-match tables, and a reproducible pipeline (seed → plan → run → finalize → validate +→ upload) runnable as SLURM **arrays** with full resume, **verified by a release-level +marker audit before upload** (§3.2). + +## 7. Open questions for the PI +- **Resolved (this revision):** execution model = one cluster, concurrent multi-user + submissions on shared Lustre (disjoint partitions by construction); scale target = + maximize object count; MMU access/match layer = lsdb/HATS over HF. +- **Global rate-limit policy:** partition sources across submitters (v1) vs a shared + token-bucket (upgrade) — which for the first real run? +- **New surveys/modalities** beyond v4's 12 — add HSC/JWST imaging (both on MMU/HF) to + strengthen the galaxy image↔spectrum pairing the PRH analysis most needs? +- **False-match gate thresholds** per population/PM-bin/|b|-bin. +- **Payload storage** — WebDataset shards vs everything-in-Parquet. +- **Singles handling** — drop <2-instrument objects, or publish as a separate config? + +--- + +## Appendix A — Data Access Matrix (which sources stream from afar vs. S3/HF) +*"How we process it" follows directly from where the data lives. Confirm volumes/limits +empirically with `probe_sources.py` before the first full run.* + +| Source (modality) | Pop. | Host & location | Protocol / partitioning | Rate limit | v5 strategy | +|---|---|---|---|---|---| +| MMU Gaia BP/RP (spectra) | star | `hf://UniverseTBD/mmu_gaia*` (HF S3/CDN) | HATS order-4; lsdb PixelSearch+cols | none | **lsdb stream** | +| MMU TESS (light curves) | star/agn | HF (UniverseTBD) | HATS; lsdb | none | **lsdb stream** | +| MMU DESI (spectra) | galaxy/agn | HF (UniverseTBD) | HATS; lsdb | none | **lsdb stream** | +| MMU Legacy Survey (images) | all | HF (UniverseTBD) | HATS; lsdb | none | **lsdb stream** (replaces v4 cutout svc) | +| MMU HSC / JWST (images) | galaxy | HF (UniverseTBD) | HATS; lsdb | none | optional add for image↔spectrum | +| ZTF (light curves) | star/agn | `s3://ipac-irsa-ztf` (AWS us-east-1) | HATS; pyarrow+S3 anon | none | **S3 stream** (~75 MB/s from Delta) | +| PS1 (images) | all | ps1images.stsci.edu (STScI) | per-object cutout HTTP | yes | thread pool + **global cap**; prefer MMU image where covered | +| unWISE (images) | all | unwise.me | coadd tiles HTTP | mild | HEALPix pre-filter | +| GALEX / 2MASS (images) | all/star | IRSA (Caltech) | HTTP | mild | HEALPix pre-filter | +| GALAH (spectra) | star | datacentral.org.au (**Australia**) | HTTP + VO SSA, per-object | yes (far) | small N; throttle; **global cap** | +| APOGEE allStar (seed+spectra) | star | MAST + **pre-staged local** (Globus) | local FITS | n/a | local read (fast) | +| SDSS spectra | galaxy/agn | **pre-staged local** dir | local FITS | n/a | local read (fast) | +| CDS XMatch (Gaia verify of star seed) | star | CDS Strasbourg (**France**) | VO XMatch service | yes (strict) | batch; **never fan across array**; global cap | + +**Key consequence:** several v4 *remote cutout* sources — Legacy Survey images +especially — are **also** available as MMU HATS on HF, so v5 replaces rate-limited cutout +calls with `lsdb`-over-`hf://` reads wherever MMU coverage suffices, collapsing the +network long-pole. `probe_sources.py` confirms per-source coverage and throughput. diff --git a/pipeline-spec/02-script-spec.md b/pipeline-spec/02-script-spec.md new file mode 100644 index 0000000..ecad0bd --- /dev/null +++ b/pipeline-spec/02-script-spec.md @@ -0,0 +1,193 @@ +# Script Spec: Hardened Cross-Match Pipeline (v5) + +> Implements [`01-methodology.md`](01-methodology.md). Keeps v4's survey logic +> (doc 03), rewrites the fragile architecture layers. Target: SLURM **array** jobs on +> CPU resources, **multi-submission-safe** on a shared filesystem, full resume with +> integrity-verified DONE markers, lsdb/HATS access for MMU sources, verified HF upload. + +## 1. Repository layout +``` +mmu/ # importable core library (type-checked, mypy-strict) + config.py # frozen dataclass config; env parsing; validate once + schemas.py # versioned pyarrow.Schema; column names; feature defs + ids.py # canonical J2016.0 coords + global_object_id (healpix-29 int64) + matching.py # apply_space_motion, radius search / lsdb n_neighbors>1, adjudication, per-source radii + coordination.py # NEW: manifest partition assignment, optional claim ledger, global per-service token bucket + io.py # atomic write (tmp->fsync->os.replace->fsync dir), DONE markers w/ checksum, shard paths + healpix.py # astropy-healpix + lsdb/hats indexing + neighbor/margin queries (fail-fast) + rate_limit.py # per-source concurrency caps + backoff + jitter (wraps coordination's global bucket) + normalize.py # (doc only) reference asinh/zscale/continuum recipes for the card + sources/ + base.py # DataSource ABC: load_targets -> fetch_candidates -> match -> emit + lsdb_mmu.py # NEW: generic lsdb/HATS reader for hf://UniverseTBD/mmu_* (margin cache, col pushdown) + ztf.py # S3 HATS (pyarrow + anonymous S3) + ps1.py legacy.py galex.py twomass.py unwise.py galah.py # custom cutout/VO/HTTP readers (kept) + apogee.py sdss.py # local pre-staged FITS readers (kept) +scripts/ # thin CLI entrypoints (one responsibility each) + build_seed_catalogs.py + plan_work_units.py + probe_sources.py # NEW: measure latency/throughput/rate-limits/coverage per source + run_source_shard.py + finalize_shard.py + finalize_release.py + validate_release.py + verify_markers.py # NEW: release-level DONE-marker audit (gates upload) + false_match_report.py + upload_hf.py +slurm/ # *.sbatch array wrappers (CPU partitions) + build_seeds.sbatch run_source_array.sbatch finalize_array.sbatch + validate_release.sbatch upload_hf.sbatch probe_sources.sbatch +tests/ # unit tests incl. TEST_MODE end-to-end on ~5 objects/pop +``` +Output tree (deterministic; content-hash in metadata, not paths): +``` +release/v5/ + seeds/population={star,galaxy,agn}/seed.parquet # + global_object_id + manifest/work_units.parquet # the single authoritative manifest + claims//.claim # only if dynamic claiming is enabled + source=/population=/shard=000123.parquet(.done.json) + final/population=/shard=000123.parquet(.done.json) + release/manifest.parquet + payloads/*.tar (WebDataset) + README.md +``` + +## 2. Data flow (seven phases, each resumable) +``` +probe_sources (once, ahead of time) +build_seed_catalogs -> plan_work_units -> run_source_shard (array) + -> finalize_shard (array) -> finalize_release + validate_release + verify_markers + -> upload_hf (single designated uploader) +``` + +## 3. Script contracts + +### `scripts/probe_sources.py` (NEW — "test pings, done right") +- **Does:** for each source in the Data Access Matrix (doc 01 App. A), measure **median + latency, sustained throughput, observed rate-limit/429 behavior, total volume estimate, + and HEALPix/HATS coverage** against a small seed sample. For MMU/lsdb sources, confirm + `hf://UniverseTBD/mmu_*` partitions open and report per-pixel download time. +- **Out:** `probe_report.json` + a Markdown table; feeds per-source concurrency caps and + the global token-bucket rates. **Run before committing cluster hours.** +- **CLI:** `--sources all --sample 2000 --out probe_report.json`. + +### `scripts/build_seed_catalogs.py` +- **In:** APOGEE allStar (SNR>50, Gaia-XMatch-verified, PM), DESI PROVABGS, SDSS DR16Q. +- **Does:** build per-population seeds; propagate stars to **J2016.0** via + `apply_space_motion`; galaxies/AGN PM=0; compute `global_object_id` + (`ids.assign_global_id`); **assert uniqueness** (fail on collision); keep native IDs. +- **Out:** `seeds/population=*/seed.parquet` (versioned). Cached/idempotent. + +### `scripts/plan_work_units.py` +- **Does:** split each seed into deterministic row-shards (`--shard-size 50000`); cross + with the active source list → emit the **single authoritative** `manifest/work_units.parquet` + (`population, source, shard_id, row_start, row_end, n_objects, seed_path, output_path`) + + a recorded `manifest_hash`. +- **CLI:** `--seeds ... --sources --shard-size 50000 --out manifest/`. + +### `scripts/run_source_shard.py` (the SLURM array workhorse) +- **In:** one manifest row, selected by **partition-aware** indexing: + `--task-id $SLURM_ARRAY_TASK_ID --partition-id k --num-partitions K` → processes rows + where `row_index % K == k` (disjoint by construction; doc 01 §3.2). Optional + `--claim` enables the dynamic ledger via `coordination.claim(row)`. +- **Does:** load that seed shard; for MMU sources use `sources.lsdb_mmu` (PixelSearch + + column pushdown + margin cache); for others the custom reader. Propagate seed → + source epoch (`apply_space_motion`), match via `matching.match(...)` + (`search_around_sky` or lsdb `n_neighbors>1` + adjudication + per-source radius), emit + typed rows `{global_object_id, , *_match_sep_arcsec, match_ambiguous, + n_candidates_within_radius}`. +- **Out:** one shard via `io.atomic_write` + `io.write_done(...)` (checksum + provenance). + **Skips if a `complete` DONE marker matches** current manifest/schema/code + checksum + (doc 01 §3.2 state machine). +- **Concurrency:** acquire a token from `rate_limit`/`coordination` before each remote + call (caps respected **globally** across submissions). Local/S3/HATS: high; remote + services: 2–8 + backoff. **Never** fan CDS XMatch/Vizier across the array. + +### `scripts/finalize_shard.py` (array, per seed shard) +- **Does:** for one `(population, shard_id)`, join all `source=*/.../shard=` on + `global_object_id`; dedup (closest *after* adjudication); compute modality & + **instrument** counts; **enforce ≥2 instruments**; assign HEALPix(nside=8) split; write + `final/...` against the versioned schema (+ DONE marker w/ checksum). +- **Note:** per-shard join ⇒ no global shuffle, bounded memory. + +### `scripts/finalize_release.py` +- Aggregate final shards into `manifest.parquet`; pack heavy payloads into WebDataset + `payloads/*.tar`; emit counts + dataset-card inputs. + +### `scripts/validate_release.py` + `scripts/verify_markers.py` + `scripts/false_match_report.py` +- `validate_release`: streaming QA (schema conformance, `global_object_id` uniqueness, + coord sanity, sampled array shapes, modality/instrument coverage, split integrity, + typed-missingness, PM sanity). +- `verify_markers` **(NEW, gates upload):** independently re-derive that **every** + manifest row is `complete` — final+marker present, manifest/schema/code hashes match, + **file checksum matches the marker**, no orphan `.tmp` files, no suspicious/stale/ + corrupt states. This is the "is *done* actually done?" audit (doc 01 §3.2). +- `false_match_report` **(rewritten to doc 01 §3.4.1):** **mirrors the production matcher** + (incl. `apply_space_motion`); **Monte-Carlo random-direction offsets** scaled to the + confusion scale (`radius ⊕ μ·Δt ⊕ ϖ ⊕ σ_astro`); a **PM-direction-scramble null**; + stratified by **PM-bin × density × |b|**; plus the analytic background cross-check. + Writes CSV + card table; **gates upload** on per source × pop × PM-bin × |b|-bin thresholds. + +### `scripts/upload_hf.py` (single designated uploader) +- `HfApi.create_repo(exist_ok=True)` → **`upload_folder(release/v5/release)`** → write + README/card → **verify** via `load_dataset(..., streaming=True)` + `hf_hub_download` + spot-check. Idempotent. **Run once, by one user, after `verify_markers` passes** — never + two uploaders against one repo concurrently (doc 01 §3.2). + +## 4. SLURM pattern (CPU, arrays, multi-submission-safe, throttled) +```bash +# probe first (cheap), then build seeds +sbatch slurm/probe_sources.sbatch +sbatch slurm/build_seeds.sbatch +# fan out: one array task per manifest row. %50 caps *this job's* concurrency; +# the global per-service token bucket caps the SUM across all concurrent submissions. +# Two users co-run by passing disjoint partitions: +sbatch --array=0-$((N-1))%50 slurm/run_source_array.sbatch # user A: --partition-id 0 --num-partitions 2 +sbatch --array=0-$((N-1))%50 slurm/run_source_array.sbatch # user B: --partition-id 1 --num-partitions 2 +# finalize per shard, then audit + validate, then a single upload +sbatch --array=0-$((S-1))%64 slurm/finalize_array.sbatch +sbatch slurm/validate_release.sbatch && sbatch slurm/upload_hf.sbatch +``` +- **Partition (CPU, not GPU):** request a CPU partition; do **not** request `--gpus` + (doc 01 §3.8 — the data path never uses one). If only GH200 nodes are available, omit + `--gpus`, request many CPUs, and pack array tasks per node. Note the **aarch64** env. +- `run_source_array.sbatch` body: + `python -m scripts.run_source_shard --manifest ... --task-id $SLURM_ARRAY_TASK_ID --partition-id $PID --num-partitions $K`. + +## 5. Type/memory safety (pragmatic) +- `@dataclass(frozen=True, slots=True)` config; parse/validate once; **no pydantic**. +- `mypy` strict on `config/ids/matching/coordination/schemas/io`; lighter on messy I/O + adapters; `numpy.typing.NDArray` where shapes matter. +- Runtime validation **only at boundaries** (after raw read, before shard write, before + marker write, before upload). +- Memory: row-shard ownership + streaming + narrow dtypes + Arrow nested types (no pandas + object payloads); pandas only for small table ops, `pyarrow` for write paths. + +## 6. Keep / rewrite from v4 +- **Keep:** survey fetch/query logic in `sources/*` (ZTF S3, cutout/VO readers, local + APOGEE/SDSS); seed selection; HEALPix pre-filter & column projection; the + shifted-catalog *concept*; `validate_dataset.py` checks. +- **Rewrite:** identity (`ids.py`); matching (`matching.py`, + lsdb for MMU); + **orchestration** (single manifest + partition-aware SLURM arrays + `coordination.py`); + **resume** (integrity-checked DONE markers + `verify_markers.py`); MMU access + (`lsdb_mmu.py` over `hf://`, replacing raw Flatiron HTTP); finalize (per-shard); upload + (`upload_hf.py`, single uploader); storage (typed schema + WebDataset); **false-match + test** (doc 01 §3.4.1 — production-mirroring, motion-aware). + +## 7. Build order (matches methodology P0→P2) +**P0:** `probe_sources`; seed row-shard ownership + partition-aware indexing; +`global_object_id`; `apply_space_motion`; radius search/lsdb + adjudication; atomic +writes + checksummed DONE + `verify_markers`; real upload. +**P1:** per-shard finalize; explicit pyarrow schema; per-source radii; lsdb `lsdb_mmu` +reader + margin cache; motion-aware false-match report; global token bucket. +**P2:** HATS/LSDB layouts for heavy local catalogs; WebDataset payloads; HSC/JWST image +adds; richer ambiguity/footprint QC; dynamic claim ledger (if needed). + +## 8. Acceptance criteria +- `probe_sources` report exists and per-source caps are set from it. +- TEST_MODE end-to-end (≈5 objects/pop) passes in CI. +- Re-running any array with all DONE markers present is a **no-op**; corrupting a final + file (checksum mismatch) forces a re-run. +- Two concurrent submissions with disjoint `--partition-id` produce no duplicate/clobbered + outputs and respect global service rate limits. +- Every released object has `n_instruments_present ≥ 2` and a unique `global_object_id`. +- `verify_markers` green; `validate_release` green; motion-aware false-match report under + threshold; HF load-back verified by the single uploader. diff --git a/pipeline-spec/03-v4-walkthrough.md b/pipeline-spec/03-v4-walkthrough.md new file mode 100644 index 0000000..83b2ff0 --- /dev/null +++ b/pipeline-spec/03-v4-walkthrough.md @@ -0,0 +1,236 @@ +# Worked Walkthrough: How the `v4/OmniSky` Pipeline Works + +> **Audience:** a junior researcher who wants to learn how to build and cross-match +> large multimodal astronomical datasets *performantly*. This explains the existing +> `layerwise-analysis/v4` code — what each piece does, **why** it is shaped that way, +> and which patterns are worth stealing. Critiques and fixes live in +> [`01-methodology.md`](01-methodology.md); this doc is "how it works today." + +## 0. The one-paragraph mental model + +No observatory stores *the same object's* IR spectrum, UV image, and optical light +curve together — each survey only knows its own observations, indexed by its own IDs +at its own epoch. So you **build** the joined dataset in four moves: + +1. **Seed.** Pick a reference catalog per population (stars, galaxies, AGN) giving a + clean list of real objects with good coordinates. Defines *which objects exist* + and is the source of truth for `object_id`, `ra`, `dec`, and (for stars) proper + motion. +2. **Fan out & match.** For every other survey ("source"), find which of its + observations land on your seed objects on the sky, and attach them. Each source + writes its matches to its own shards. +3. **Merge.** Join all per-source shards back onto the seed catalog by `object_id`, + one population at a time, in memory-bounded chunks. +4. **Validate & upload.** Check correctness, then push to HuggingFace. + +``` + SEED CATALOGS PER-SOURCE FAN-OUT MERGE SHIP + stars(APOGEE+Gaia) flatiron(Gaia/TESS/DESI) finalize validate + + galaxies(PROVABGS) → apogee/sdss/galah (spectra) → (chunked → false-match + agn(SDSS DR16Q) ztf (light curves) join by test + push + legacy/galex/2mass/unwise(img) object_id) to HuggingFace +``` + +**Why "seed-and-match"?** Pairwise all-to-all matching of *N* surveys is *N²* joins +with no natural "object" to anchor on. Anchoring on a seed catalog makes it *N* +one-to-many matches, gives every row a stable identity, and lets you reason about +completeness. This is the single most important design decision. + +## 0.5 What v5 changes (read alongside doc 01) + +This doc explains v4 *as it works today*. Five things below are **superseded** by the +hardened v5 design — flagged here so you don't copy a known-fragile pattern: + +- **False-match test has no proper motion.** `false_match_test.py` shifts the seed + **+30″ in RA only** and matches **un-propagated** positions with nearest-only — i.e. + it benchmarks a *different, simpler matcher than production*, and 30″ is comparable to + a high-PM star's multi-decade displacement. v5 replaces it with a motion-aware protocol + (doc 01 §3.4.1): production-mirroring, Monte-Carlo random-direction offsets scaled to + the real+apparent-motion confusion scale, a PM-direction-scramble null, stratified by + PM × density × |b|. +- **Resume is `is_cached()` = "shard dir has ≥1 parquet".** A source that crashes after + writing 40 of 200 shards is treated as *complete and silently skipped* — an incomplete + dataset with no error. v5 uses integrity-checked DONE markers (checksum + provenance) + + a `verify_markers` audit (doc 01 §3.2). +- **MMU data is fetched as raw HTTP** off `users.flatironinstitute.org`. v5 reads it via + **lsdb/HATS over `hf://datasets/UniverseTBD/mmu_*`** (partition + column pushdown, margin + cache; doc 01 §3.3). The `TobiasPitters/mmu-crossmatch` Space is the reference for that + access pattern — but note it is a *single-pixel, two-catalog, positional-only (no PM) + lsdb demo*, a template, **not** a pipeline. +- **HF upload is `create_repo`-only** — the data was **never actually uploaded**. v5 adds a + real `upload_folder` + load-back verification, run by a single designated uploader. +- **Single-node `ThreadPoolExecutor`** orchestration. v5 is SLURM **arrays on CPU**, + safe for concurrent multi-user submissions on a shared filesystem (doc 01 §3.2, §3.8). + +## 1. Match on *coordinates*, never on *names* + +Different catalogs name the same object differently, and objects move. The only +reliable shared key is *position on the sky at a given time*. So the join key is +`(ra, dec)` with a tolerance ("within 3″ → same object"). The seed `object_id` +(APOGEE_ID, PROVABGS id, SDSS_NAME) is the **row key for the merge**, not the match +key. The match is purely positional. (Methodology adds a coordinate-derived global +ID so identity doesn't depend on which survey seeded the object.) + +## 2. The cross-match core — `pipeline/utils/crossmatch.py` + +Three ideas: angular matching, epoch propagation, HEALPix pre-filtering. + +### 2.1 `match_to_catalog_sky` +```python +idx, sep, _ = catalog_coords.match_to_catalog_sky(tgt_coords) +mask = np.array(sep < radius * u.arcsec) +``` +`A.match_to_catalog_sky(B)` builds a **k-d tree** over B and, for each element of A, +returns the nearest B plus separation. It is **nearest-only**: it always returns +*a* match (even 50″ away), so you must apply the `sep < radius` cut yourself; and it +does not guarantee the match is mutual. The k-d tree makes each lookup *O(log M)* — +the difference between seconds and hours at millions of rows. + +### 2.2 The two matching *directions* (read twice) +- **`crossmatch_to_catalog()` (flatiron): catalog → target.** For each seed object, + nearest target. Output arrays are catalog-length. +- **`ztf.py`: target → catalog.** For each ZTF object in a tile, nearest seed; keep + only ZTF rows within radius. **Why flip?** Streaming a 5k-object tile against a + 1.5M-row catalog this way keeps working arrays tile-sized, not catalog-sized, and + discards the 99% of tile objects that aren't yours. Direction choice is a + memory/throughput optimization. Cost: many-to-one (dedup-by-closest fixes it). + +### 2.3 Epoch propagation — `propagate_coords()` (the "stars move" fix) +Nearby stars drift arcsec/decade. Match a 2016 Gaia position against a 1999 image +and you grab a neighbor. So propagate seed positions to each survey's epoch first: +```python +dt = target_epoch - ref_epoch +ra += (pmra / 3.6e6) / cos_dec * dt # mas/yr → deg/yr, de-projected +dec += (pmdec / 3.6e6) * dt +``` +- `pmra = μ_α·cos(δ)` (Gaia convention); dividing by `cos_dec` converts on-sky motion + to a ΔRA *coordinate* increment. Forgetting this is a classic bug. +- `3.6e6` = mas per degree. NaN PM → 0 (correct for extragalactic; safe for stars). +- `SURVEY_EPOCHS` (config.py) hard-codes each survey's mean epoch. That table is what + makes matching honest. +- **Good but not best:** this is flat/linear. Methodology recommends astropy + `SkyCoord.apply_space_motion()` (rigorous spherical motion) instead. + +### 2.4 HEALPix pre-filtering — the big throughput win +```python +cat_healpix = compute_catalog_healpix(ra, dec, nside=16) +filtered = filter_healpix_cells(available_cells, cat_healpix) +``` +Surveys stored partitioned by HEALPix cell (dirs named `healpix=1234/`) let you +**skip downloading cells your catalog never touches** — routinely 5–50× less I/O for +a sparse seed against an all-sky survey. The highest-leverage perf trick here, and it +works *because* your catalog and the remote data share the same HEALPix scheme. Risk: +the helper silently falls back to *no filtering* if `healpy`/`astropy_healpix` are +missing (methodology makes this a hard dependency). + +## 3. Performant data-access patterns (steal these) + +- **A — Spatial pre-filter before download** (`flatiron`, `ztf`): only fetch + partitions intersecting your catalog's HEALPix cells. *Don't fetch sky you don't need.* +- **B — Column projection / two-pass read** (`ztf`): read cheap position columns + (~7 MB), match, then read the expensive nested light-curve column only for matched + rows. Columnar formats let you read one column without the rest. Move megabytes, + not gigabytes. +- **C — Streaming shard flush** (everywhere): buffer rows; at `shard_size` (5000) + write a Parquet shard and clear. Memory stays bounded at ~one shard. +- **D — Parallel I/O with thread pools**: downloads are I/O-bound, so threads help + even under the GIL. Note `ps1.py`'s comment *"requests.Session is NOT thread-safe"* + — each thread makes its own request. The detail that separates demo code from code + that survives a 16-hour run. +- **E — Caching/resume by shard dir** (`base.py`): a source with existing shards is + skipped. Coarse (whole-source granularity); methodology replaces it with per-unit + DONE markers. +- **F — Reverse-match to bound memory** (§2.2). + +**Meta-lesson:** performance here is rarely clever math. It's (1) not fetching data +you'll discard, (2) keeping memory flat, (3) overlapping network waits. Master those +three and you can build million-object datasets on one node. + +## 4. The source abstraction — `pipeline/sources/base.py` +Every survey is a `DataSource` subclass with one required method `fetch(catalog_df)`. +The base provides `run()` (caching wrapper), `save_shard()`, `load_shards()`, and a +cheap `preflight()` reachability check. Clean plugin pattern: adding a survey = one +file that (a) finds its data, (b) propagates the catalog to its epoch, (c) matches, +(d) emits `{object_id, }`. The prefix (`ztf_`, `apogee_`, `legacy_`) +namespaces columns so they never collide in the wide table. + +Representative sources: `flatiron.py` (HEALPix dirs, parallel cells, catalog→target); +`ztf.py` (S3 HATS Parquet, column projection, reverse match); `ps1.py` (cutout web +service, thread-per-cutout, raw `(3,64,64)` float32); `apogee.py` (local FITS, +SNR-deduped lookup, fixed good-pixel crop). Note `apogee.py` crops to a fixed 7514-px +"good pixel" set so every spectrum is identical length — fix array lengths early and +the downstream ML is far simpler. + +## 5. Seed catalogs — `pipeline/catalogs/` +Three quality strategies: **stars** (`stars.py`) start from APOGEE allStar, keep +SNR>50, dedup to highest-SNR per star, then **verify each exists in Gaia DR3 via CDS +XMatch** and inherit Gaia proper motions; fails loud if XMatch returns nothing. +**galaxies** (`galaxies.py`) reuse the DESI PROVABGS HDF5 cells, `pm=0`. **agn** +(`agn.py`) use SDSS DR16Q with `z>0.01`, with truncation checks. Each caches its +`_catalog.parquet`. Different methods because "what counts as a real, +well-localized object" differs by population — the seed step encodes that judgment. + +## 6. Merge & finalize — `pipeline/finalize.py` +Joins per-source shards onto each population catalog. The hard part is memory +(700k stars × 7514-float spectra × several surveys = hundreds of GB as lists). So: +- **One population at a time, chunks of 50k** (`MERGE_CHUNK`); `_load_shards_filtered` + loads only the chunk's `object_id`s from each source. Peak memory = one chunk. +- **Dedup = keep closest:** sort by `{source}_match_sep_arcsec`, drop_duplicates keep + first. +- **Modality bookkeeping:** `n_spectra/n_lightcurves/n_images/n_modality_types` per + object — what lets a user filter to "≥2 instrument types." +- **Spatial split:** `_add_split` assigns train/val/test by **HEALPix cell (nside=8)**, + not per-object-random — nearby objects share extinction/depth, so random splits leak. + +## 7. Parquet I/O — `pipeline/utils/parquet.py` +`save_shard` writes `NNNNN.parquet` after `make_parquet_safe` turns numpy arrays into +lists (`list`). Simple and portable (hence `np.array(row["twomass_j"].tolist())`). +Cost (flagged): object-dtype list columns are the least efficient Arrow encoding for +fixed-shape arrays — methodology moves 64×64 images and 7514-px spectra to Arrow +`fixed_size_list`. + +## 8. Validation — `validate_dataset.py` + `false_match_test.py` +Both **streaming** (one shard in RAM). +- `validate_dataset.py`: counters-only single pass — required columns, population + counts, coordinate sanity, modality coverage, sampled array shapes (images 64×64, + APOGEE 7514), no string-encoded arrays, no cross-population contamination, headline + multimodal coverage. Run before every upload. +- `false_match_test.py` (**learn this trick**): a cross-match always *produces* + matches — are they real? **Shift the whole catalog by 30″ and re-run.** No real + counterpart can line up at 30″, so any surviving matches are spurious: the shifted + match rate **is your false-match rate**. Broken down by galactic latitude (crowded + `|b|<15°` fields coincide far more). The 0.02% mean / 0.06% worst-case numbers are + what justify trusting the dataset. Ship a shifted-catalog number for every + cross-matched catalog you build — it's the cross-matching unit test. + +## 9. Orchestration — `run_pipeline.py` + `pipeline/runner.py` +Sequences phases: preflight → build catalogs → Flatiron/APOGEE/SDSS → the rest under +a single-node `ThreadPoolExecutor(max_workers=6)` (one thread per *source*) → +finalize. `from_env()` reads everything from env vars; `TEST_MODE=1` shrinks every +population so you can run the whole pipeline locally in a minute before burning +cluster hours. **This is what changes most for cluster scale** — see methodology +§Concurrency and the spec. + +## 10. Quick map: solid vs. scratch +| Area | State | Note | +|---|---|---| +| Seed-and-match architecture | **Solid** | Keep. Right backbone. | +| `match_to_catalog_sky` + radius cut | **Solid** | Keep; add `search_around_sky` for dense fields. | +| HEALPix pre-filter + column projection | **Solid, excellent** | The core perf wins. | +| Proper-motion propagation | **Works, hand-rolled** | Swap for `apply_space_motion`. | +| Streaming shards + chunked merge | **Solid** | Good memory discipline. | +| Shifted-catalog false-match test | **Solid, excellent** | Run per-source; gate uploads. | +| Resume by source-dir caching | **Scratch / coarse** | Replace with per-unit DONE markers. | +| Single-node ThreadPool orchestration | **Scratch for cluster** | Re-architect to SLURM shard tasks. | +| `object_id` = survey-native | **Scratch** | Add coordinate-derived global ID. | +| HF upload | **Missing** | Only `create_repo`; real `push`/`upload_folder` not wired. | +| Array storage as list columns | **Works, inefficient** | Use Arrow `fixed_size_list`. | +| Normalization (HDR images) | **Absent (raw stored)** | Intentional — document recipes. | +| Type/memory safety | **Partial** | Add typed pyarrow schema + boundary validation. | + +### TL;DR +Seed on a clean catalog and match everything else by **sky position at the right +epoch**, never by name. Performance = don't fetch what you'll discard (HEALPix + +column pre-filter) + stream so memory stays flat + thread the network waits. Correct +the epoch or you match the wrong star. Prove matches with a shifted-catalog test; +split by sky region. Keep raw values; normalize at train time, per modality. diff --git a/pipeline-spec/04-downstream-analysis-interface.md b/pipeline-spec/04-downstream-analysis-interface.md new file mode 100644 index 0000000..745dbda --- /dev/null +++ b/pipeline-spec/04-downstream-analysis-interface.md @@ -0,0 +1,78 @@ +# Side-car: The Downstream Analysis Interface (why the dataset is shaped this way) + +> Not part of the data-generation pipeline — this is the **contract** between the +> dataset and the Platonic-Representation analysis (which lives in +> `UniverseTBD/platonic-universe`). It tells you, a junior researcher, *what the +> analysis needs from the data*, so the dataset is built to serve it. + +## 1. What the experiment actually does +The PRH analysis asks: do two models — trained on **different instruments/modalities** +— organize the **same objects** the same way in their representation spaces? +Pipeline: (1) **extract** embeddings by forward-passing each object through each model, +layer by layer; (2) **compute** an alignment score between every pair of (model, layer) +embeddings; (3) optionally **regress** physical labels (redshift, M*, sSFR) as a sanity +anchor. Tiers map to the platonic-universe repo's `extract/ compute/ regress/`. **Resource note:** this side-car is where the GPUs belong — embedding extraction (model forward passes), CKA's O(N²) Gram matrices, and mutual-kNN (FAISS-GPU) are GPU-bound, *unlike* the CPU/network-bound data-generation pipeline, which should not consume GPU allocation (doc 01 §3.8). + +## 2. The two metrics and what they demand of the data +- **Mutual k-NN alignment (mNN):** for the **same N objects**, embed with model A and + model B; for each object compare its k nearest neighbors in A's space vs B's space; + score = mean overlap / k. Local, batchable, the PRH paper's preferred metric (k=10). +- **CKA (Centered Kernel Alignment):** the global limit (k→N) of the same idea; uses + all pairwise similarities; O(N²) memory. + +**The hard requirement both share:** the **same set of N physical objects must be +encoded by both models** — index *i* must mean the *same object* in both modalities. +This is exactly why the dataset is **cross-matched and keyed by `global_object_id`**: +that ID is the anchor that makes "the same object in modality A and modality B" +well-defined. Without a stable shared key, mNN/CKA are meaningless. + +## 3. Sample size (reassuring news) +- AstroCLIP (arXiv:2310.03024) aligned image↔spectrum on **197,632** galaxies (matched + by shared TARGETID; 144×144 image crops; per-dataset z-score norm). +- The PRH paper (arXiv:2405.07987) got clean, interpretable trends with **N≈1,000–1,024 + paired objects** at k=10. No formal minimum is given, but ~1k well-matched pairs is a + working baseline; thousands is comfortable. +- **Implication:** the bottleneck is **clean ≥2-instrument pairs**, not millions of + objects. A few thousand *trustworthy* image+spectrum pairs already powers the + headline measurement; more pairs mainly tighten error bars and let you slice by + object type / capacity. This is why doc 01 optimizes for match *quality* over volume. + +## 4. What the dataset must therefore guarantee +1. **A stable shared key** (`global_object_id`) so the *same* object is identifiable + across every modality. (P0 in the spec.) +2. **Paired coverage:** enough objects with **both** of any two modalities of interest + (esp. **image + spectrum** for galaxies, the AstroCLIP-style pairing the PRH analysis + leans on). Track this with `n_modality_types` / `instrument_presence_mask`. +3. **Fixed, model-ready array shapes** (e.g. 64×64 images, fixed-length spectra) so the + extract tier can batch without bespoke padding per object. +4. **Raw values + documented normalization** so each model applies the preprocessing it + expects (asinh/zscale for images; continuum norm for spectra) — the dataset must not + pre-bake a single normalization that biases cross-model comparison. +5. **A clean spatial split** so any probing/regression doesn't leak across train/test. +6. **Provenance** (which instrument, which release, match separation/ambiguity) so the + analysis can filter to high-confidence pairs and attribute any (non)convergence to + real physics rather than match contamination. + +## 5. Minimal interface sketch (what the extract tier will call) +```python +# Conceptual — the analysis iterates objects with >=2 modalities and pulls raw arrays. +ds = load_release("kshitij/omnisky-v5", streaming=True) +for obj in ds: + if obj["n_instruments_present"] < 2: # the >=2-instrument guarantee + continue + oid = obj["global_object_id"] # stable anchor across modalities + image = decode_image(obj, band="legacy_grz") # raw -> model's own asinh/zscale + spec = decode_spectrum(obj, source="desi") # raw flux + ivar -> model's norm + emit(oid, modality="image", embedding=model_img(image)) + emit(oid, modality="spectrum", embedding=model_spec(spec)) +# Alignment then joins emitted embeddings on `oid` and computes mNN/CKA over shared oids. +``` +The single load-bearing detail: **everything joins on `global_object_id`.** Build the +dataset so that key is unique, stable, and present on every modality, and the +downstream analysis is straightforward. + +## 6. References +- Huh, Cheung, Wang, Isola 2024 — *The Platonic Representation Hypothesis*, arXiv:2405.07987 (mNN/CKA, k=10, N≈1k). +- Duraphe, Smith, Sourav, Wu 2025 — *The Platonic Universe: Do Foundation Models See the Same Sky?*, arXiv:2509.19453. +- Parker, Lanusse et al. 2024 — *AstroCLIP*, arXiv:2310.03024 (197,632 image–spectrum pairs by TARGETID). +- Smith, Roberts, Angeloudi, Huertas-Company 2024 — *AstroPT*, arXiv:2405.14930 (8.6M images; multimodal extension arXiv:2503.15312). diff --git a/pipeline-spec/README.md b/pipeline-spec/README.md new file mode 100644 index 0000000..a7f5e18 --- /dev/null +++ b/pipeline-spec/README.md @@ -0,0 +1,37 @@ +# Pipeline Spec — Cross-Matched Multimodal Dataset for the Platonic Universe + +Deliverables for building a cluster-scale, cross-matched, multimodal astronomical +dataset (stars + galaxies + AGN; images + spectra + light curves + tabular), uploaded +to HuggingFace, to extend the *Platonic Universe* program (arXiv:2509.19453). + +| Doc | What it is | Read it for | +|-----|------------|-------------| +| [`01-methodology.md`](01-methodology.md) | End-to-end methodology (PI review draft, v5) | The *why*: identity, the multi-submission consistency model, lsdb/HATS access, motion-aware matching & false-match protocol, resource/throughput, storage, correctness/bias checklist, Data Access Matrix | +| [`02-script-spec.md`](02-script-spec.md) | The scripts (v5) | Module/CLI layout, data flow, partition-aware SLURM arrays, acceptance criteria | +| [`03-v4-walkthrough.md`](03-v4-walkthrough.md) | Worked walkthrough of the existing `v4/OmniSky` code (with v5-deltas callout) | How performant cross-matching works today, and what v5 supersedes | +| [`04-downstream-analysis-interface.md`](04-downstream-analysis-interface.md) | Side-car: the analysis contract | What the PRH (mutual-kNN/CKA) analysis needs, and why the GPUs live here | + +## The decisions baked in (from PI sign-off) +1. **Extend & harden** the existing `v4/OmniSky` pipeline (don't rebuild). +2. **All three populations** (stars + galaxies + AGN); **maximize object count** (match/beat v4's 1.58M). +3. **One cluster (DeltaAI), concurrent multi-user SLURM submissions on shared Lustre** — disjoint partitions by construction; global per-service rate limits. +4. **lsdb/HATS over `hf://UniverseTBD/mmu_*`** for MMU source access & matching; ZTF over S3. +5. **Global deterministic ID** (HEALPix-order-29 int64 at J2016.0, continuous with MMU's native `_healpix_29`) + **store raw** + **documented normalization** at train time. +6. **Data generation + verified HF upload** now; downstream analysis interface as a side-car (doc 04). + +## The things that keep the experiment honest +- Correct the **epoch** (proper motion) before every match — or you match the wrong star. +- Match on **coordinates + a stable global ID**, never on names. +- Prove matches with a **motion-aware false-match protocol** (production-mirroring; random-direction Monte-Carlo offsets scaled to real+apparent motion; a PM-direction-scramble null; stratified by PM × density × |b|) — *not* a single fixed 30″ shift. +- **Split by sky region** (HEALPix), never per-object random. +- Enforce **≥2 instruments** per retained object; keep **raw** values; normalize per modality downstream. +- Make "done" mean done: **integrity-checked DONE markers** (checksum + provenance) + a `verify_markers` audit that **gates upload**. +- Cross-matching is **CPU/network-bound** — run data-gen on **CPU** nodes, not GPU; reserve GPUs for the downstream analysis. + +## Status / next step +Drafts for review (v5, addressing PI feedback on false-match severity, the multi-cluster +memory/consistency model, CPU/GPU throughput, DONE-marker robustness, and dataset +availability). On approval, implement in **P0 → P1 → P2** order (spec §7): start with +`probe_sources.py`, `ids.py`, `matching.py`, the manifest + partition-aware +`run_source_shard.py` SLURM workhorse, `coordination.py`, atomic writes/checksummed DONE +markers + `verify_markers.py`, and `upload_hf.py`.