diff --git a/pyproject.toml b/pyproject.toml index a17c1e90..a1e560c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "s3fs>=2024.6.0", "boto3>=1.34.0", "pyproj>=3.7.0", + "pystac>=1.8.0", "structlog>=25.5.0", ] diff --git a/src/eopf_geozarr/cli.py b/src/eopf_geozarr/cli.py index b861df8e..d8104e65 100755 --- a/src/eopf_geozarr/cli.py +++ b/src/eopf_geozarr/cli.py @@ -1108,6 +1108,20 @@ def consolidate_s1_command(args: argparse.Namespace) -> None: sys.exit(1) +def generate_stac_s1_command(args: argparse.Namespace) -> None: + """Build and print a STAC item for an S1 GRD RTC Zarr store.""" + import json + + from .stac.s1_rtc import build_s1_rtc_stac_item + + try: + item = build_s1_rtc_stac_item(args.store, args.collection) + print(json.dumps(item.to_dict(), indent=2)) + except Exception as e: + log.exception("❌ Error generating STAC item", error=str(e)) + sys.exit(1) + + def create_parser() -> argparse.ArgumentParser: """ Create the argument parser for the CLI. @@ -1211,6 +1225,7 @@ def create_parser() -> argparse.ArgumentParser: # Add S1 ingestion commands add_s1_ingestion_commands(subparsers) + add_s1_stac_commands(subparsers) return parser @@ -1277,6 +1292,24 @@ def add_s1_ingestion_commands(subparsers: argparse._SubParsersAction) -> None: cons_parser.set_defaults(func=consolidate_s1_command) +def add_s1_stac_commands(subparsers: argparse._SubParsersAction) -> None: + """Add S1 GRD RTC STAC builder command to CLI parser.""" + stac_parser = subparsers.add_parser( + "generate-stac-s1", + help="Build and print a STAC item for an S1 GRD RTC Zarr store", + ) + stac_parser.add_argument( + "--store", type=str, required=True, help="Path or s3:// URI to the Zarr store" + ) + stac_parser.add_argument( + "--collection", + type=str, + default="sentinel-1-grd-rtc-staging", + help="STAC collection ID (default: sentinel-1-grd-rtc-staging)", + ) + stac_parser.set_defaults(func=generate_stac_s1_command) + + def add_s2_optimization_commands(subparsers: argparse._SubParsersAction) -> None: """Add S2 optimization commands to CLI parser.""" diff --git a/src/eopf_geozarr/conversion/s1_ingest.py b/src/eopf_geozarr/conversion/s1_ingest.py index 8109e8a7..2b541cae 100644 --- a/src/eopf_geozarr/conversion/s1_ingest.py +++ b/src/eopf_geozarr/conversion/s1_ingest.py @@ -19,18 +19,28 @@ from dataclasses import dataclass from math import ceil from pathlib import Path +from typing import TYPE_CHECKING, cast import numpy as np import rasterio import structlog import zarr import zarr.codecs +from pyproj import CRS as PyprojCRS + +# `FillValueCoder` is xarray's internal CF fill-value encoder: the canonical way to produce +# the base64 `_FillValue` xarray reads back under `use_zarr_fill_value_as_mask` (xarray +# #11345); the S2 path relies on the same mechanism. Internal API — revisit if xarray moves it. +from xarray.backends.zarr import FillValueCoder from zarr_cm import geo_proj from zarr_cm import multiscales as multiscales_cm from zarr_cm import spatial as spatial_cm from eopf_geozarr.conversion.utils import calculate_aligned_chunk_size +if TYPE_CHECKING: + from eopf_geozarr.data_api.geozarr.types import S1BackscatterAttrsJSON + log = structlog.get_logger() # ============================================================================= @@ -130,7 +140,7 @@ def extract_geotiff_metadata(path: str | Path) -> S1TilingMetadata: If critical tags (ACQUISITION_DATETIME, ORBIT_NUMBER, RELATIVE_ORBIT_NUMBER, FLYING_UNIT_CODE) are missing. """ - with rasterio.open(str(path)) as src: + with _rasterio_env(path), rasterio.open(str(path)) as src: tags = src.tags() t = src.transform spatial_transform = [t.a, t.b, t.c, t.d, t.e, t.f] @@ -296,20 +306,131 @@ def _create_spatial_coordinate_arrays( ) -def create_s1_store( - store_path: str | Path, - orbit_direction: str, - metadata: S1TilingMetadata, +# CF datetime encoding for the `time` coordinate. Without it the array is a bare int64 and readers +# (xarray / TiTiler's `open_datatree(decode_times=True)`) cannot expose `time` as a datetime index, so +# per-acquisition previews can only select positionally (`sel=time={index}`) — fragile once a cube's +# time axis goes non-monotonic. With these attrs `time` decodes to datetime64 and previews can render by +# `sel=time={datetime}` (order-immune). The stored dtype stays int64 nanoseconds. See data-model #192. +TIME_CF_ATTRS = { + "units": "nanoseconds since 1970-01-01", + "calendar": "proleptic_gregorian", + "standard_name": "time", +} + +# CF `_FillValue` for the float32 arrays, mirroring the S1 GRD converter (geozarr.py) +# and S2 (data-model #172). This is what lets xarray mask NaN nodata via +# `use_zarr_fill_value_as_mask=True` despite xarray #11345 — the zarr-level `fill_value` +# field alone is not surfaced through xarray's encoding. We encode it with +# `FillValueCoder` so the stored attribute matches the base64 form S2 writes +# (`AAAAAAAA+H8=`). The S2 path gets this for free via `to_zarr`; this store is written +# zarr-direct, so the attribute must be set explicitly. +FLOAT32_NAN_FILL_VALUE = FillValueCoder.encode(np.nan, np.dtype("float32")) +# CF metadata for the backscatter bands (vv/vh). +BACKSCATTER_CF_ATTRS: S1BackscatterAttrsJSON = { + "standard_name": "surface_backwards_scattering_coefficient_of_radar_wave", + "units": "1", + "_FillValue": FLOAT32_NAN_FILL_VALUE, +} + + +def _create_time_coordinate_array(level_group: zarr.Group) -> None: + """Create the CF-encoded ``time`` coordinate (length 0, grown on append) on one level group. + + Replicated at EVERY multiscale level (with identical values) so datetime ``.sel`` resolves at + whatever level a reader renders — TiTiler picks a coarse level for previews, and a level lacking a + ``time`` coordinate cannot be selected by datetime. Keeping the dtype/values consistent across + levels is also required for the datatree to open (mixed int64/datetime64 fails alignment). + """ + t_arr = level_group.create_array( + "time", + shape=(0,), + dtype="int64", + chunks=(512,), + fill_value=0, + dimension_names=["time"], + ) + t_arr.attrs.update({**TIME_CF_ATTRS, "_ARRAY_DIMENSIONS": ["time"]}) + + +def _add_grid_mapping(group: zarr.Group, crs_string: str) -> None: + """Add a CF ``spatial_ref`` grid-mapping coordinate to a group holding (y, x) arrays. + + rioxarray -- and TiTiler's GeoZarr reader -- resolve the CRS from a CF + ``spatial_ref``/``crs_wkt`` coordinate; the GeoZarr ``proj:code`` attribute alone + is not read. This mirrors the S2 converter (which writes a ``spatial_ref`` + grid-mapping variable via ``rio.write_crs``). The CF attributes come from + ``pyproj.CRS.to_cf()`` -- the same source rioxarray uses -- so the projection is + described correctly for any CRS rather than hard-coded. + + The scalar ``spatial_ref`` array is created (if absent) and every (y, x) data + array in the group is given ``grid_mapping = "spatial_ref"``. + """ + cf_attrs = PyprojCRS.from_user_input(crs_string).to_cf() + # rioxarray writes both ``crs_wkt`` and a ``spatial_ref`` attr holding the WKT. + cf_attrs["spatial_ref"] = cf_attrs["crs_wkt"] + cf_attrs["_ARRAY_DIMENSIONS"] = [] + + if "spatial_ref" in group: + sref = group["spatial_ref"] + else: + sref = group.create_array("spatial_ref", shape=(), dtype="int64", fill_value=0) + sref[...] = 0 + sref.attrs.update(cf_attrs) + + for name, arr in group.arrays(): + if name != "spatial_ref" and {"y", "x"}.issubset(arr.metadata.dimension_names or ()): + arr.attrs["grid_mapping"] = "spatial_ref" + + +def _create_band_arrays(level_group: zarr.Group, level_h: int, level_w: int) -> None: + """Create the (time, y, x) data bands (vv, vh, border_mask) for one multiscale level. + + Float backscatter bands (vv/vh) get the CF ``_FillValue``/``standard_name``/``units`` + attributes so readers can mask NaN nodata (xarray #11345) — parity with the S1 GRD + converter and S2 (data-model #172). Shared by ``create_s1_store`` (new store) and + ``ingest_s1tiling_acquisition`` (new orbit added to an existing store) so the two + creation paths cannot drift. + """ + inner_chunks = ( + 1, + calculate_aligned_chunk_size(level_h, 512), + calculate_aligned_chunk_size(level_w, 512), + ) + shard_shape = (1, level_h, level_w) + for name, dtype, fill in [ + ("vv", "float32", float("nan")), + ("vh", "float32", float("nan")), + ("border_mask", "uint8", 0), + ]: + band = level_group.create_array( + name, + shape=(0, level_h, level_w), + dtype=dtype, + chunks=inner_chunks, + shards=shard_shape, + compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), + fill_value=fill, + dimension_names=["time", "y", "x"], + ) + if name in ("vv", "vh"): + # cast: zarr's `attrs.update` is typed for its JSON union, which a TypedDict + # (invariant) doesn't satisfy structurally; the values are JSON-safe. + band.attrs.update(cast("dict", BACKSCATTER_CF_ATTRS)) + + +def _build_orbit_group( + root: zarr.Group, orbit_direction: str, metadata: S1TilingMetadata ) -> zarr.Group: - """Create a new S1 GRD RTC Zarr V3 store with full conventions metadata. + """Create one orbit group (asc/desc) with full GeoZarr metadata. - Returns the root group. + Builds the multiscale level groups (bands + spatial coordinates + grid-mapping + + per-level ``time``) and the native-resolution per-acquisition metadata coordinates. + Shared by ``create_s1_store`` (new store) and ``ingest_s1tiling_acquisition`` (new + orbit added to an existing store) so the two creation paths cannot drift — the inline + path previously omitted ``proj:code`` on level groups. """ layout = compute_multiscales_layout(metadata.shape, metadata.spatial_transform) - - root = zarr.open_group(str(store_path), mode="w-", zarr_format=3) orbit_group = root.create_group(orbit_direction) - orbit_group.attrs.update( { "zarr_conventions": ZARR_CONVENTIONS, @@ -332,40 +453,22 @@ def create_s1_store( { "spatial:shape": [level_h, level_w], "spatial:transform": level_entry["spatial:transform"], + "proj:code": metadata.crs, } ) - inner_chunks = ( - 1, - calculate_aligned_chunk_size(level_h, 512), - calculate_aligned_chunk_size(level_w, 512), - ) - shard_shape = (1, level_h, level_w) - - for name, dtype, fill in [ - ("vv", "float32", float("nan")), - ("vh", "float32", float("nan")), - ("border_mask", "uint8", 0), - ]: - level_group.create_array( - name, - shape=(0, level_h, level_w), - dtype=dtype, - chunks=inner_chunks, - shards=shard_shape, - compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), - fill_value=fill, - dimension_names=["time", "y", "x"], - ) + _create_band_arrays(level_group, level_h, level_w) _create_spatial_coordinate_arrays( level_group, level_h, level_w, level_entry["spatial:transform"] ) + _add_grid_mapping(level_group, metadata.crs) + # `time` coordinate on every level so datetime `.sel` resolves at any rendered scale (#192). + _create_time_coordinate_array(level_group) - # Coordinate variables at native resolution only + # Per-time metadata coordinates at native resolution only (not selected on by readers). r10m = orbit_group["r10m"] for name, dtype, fill in [ - ("time", "int64", 0), ("absolute_orbit", "int32", 0), ("relative_orbit", "int32", 0), ]: @@ -385,6 +488,20 @@ def create_s1_store( fill_value="", dimension_names=["time"], ) + return orbit_group + + +def create_s1_store( + store_path: str | Path, + orbit_direction: str, + metadata: S1TilingMetadata, +) -> zarr.Group: + """Create a new S1 GRD RTC Zarr V3 store with full conventions metadata. + + Returns the root group. + """ + root = zarr.open_group(str(store_path), mode="w-", zarr_format=3) + _build_orbit_group(root, orbit_direction, metadata) log.info( "Created S1 store", @@ -470,13 +587,13 @@ def ingest_s1tiling_acquisition( ValueError If the GeoTIFF CRS or shape does not match the existing store. """ - vv_path = Path(vv_path) - vh_path = Path(vh_path) - border_mask_path = Path(border_mask_path) + vv_path = _coerce_input_path(vv_path) + vh_path = _coerce_input_path(vh_path) + border_mask_path = _coerce_input_path(border_mask_path) store_path = Path(store_path) for p in [vv_path, vh_path, border_mask_path]: - if not p.exists(): + if not _input_path_exists(p): raise FileNotFoundError(f"GeoTIFF not found: {p}") # Extract metadata from VV file @@ -494,77 +611,9 @@ def ingest_s1tiling_acquisition( else: root = zarr.open_group(str(store_path), mode="r+", zarr_format=3) if orbit_direction not in root: - # Create orbit direction group in existing store - orbit_group = root.create_group(orbit_direction) - layout = compute_multiscales_layout(meta.shape, meta.spatial_transform) - orbit_group.attrs.update( - { - "zarr_conventions": ZARR_CONVENTIONS, - "multiscales": { - "layout": layout, - "resampling_method": "average", - }, - "proj:code": meta.crs, - "spatial:dimensions": ["y", "x"], - "spatial:bbox": meta.bounds, - } - ) - for level_entry in layout: - level_name = level_entry["asset"] - level_h, level_w = level_entry["spatial:shape"] - level_group = orbit_group.create_group(level_name) - level_group.attrs.update( - { - "spatial:shape": [level_h, level_w], - "spatial:transform": level_entry["spatial:transform"], - } - ) - inner_chunks = ( - 1, - calculate_aligned_chunk_size(level_h, 512), - calculate_aligned_chunk_size(level_w, 512), - ) - shard_shape = (1, level_h, level_w) - for name, dtype, fill in [ - ("vv", "float32", float("nan")), - ("vh", "float32", float("nan")), - ("border_mask", "uint8", 0), - ]: - level_group.create_array( - name, - shape=(0, level_h, level_w), - dtype=dtype, - chunks=inner_chunks, - shards=shard_shape, - compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), - fill_value=fill, - dimension_names=["time", "y", "x"], - ) - _create_spatial_coordinate_arrays( - level_group, level_h, level_w, level_entry["spatial:transform"] - ) - r10m = orbit_group["r10m"] - for name, dtype, fill in [ - ("time", "int64", 0), - ("absolute_orbit", "int32", 0), - ("relative_orbit", "int32", 0), - ]: - r10m.create_array( - name, - shape=(0,), - dtype=dtype, - chunks=(512,), - fill_value=fill, - dimension_names=["time"], - ) - r10m.create_array( - "platform", - shape=(0,), - dtype=" 0: + raise ValueError( + f"Cannot append to {orbit_direction}: r10m has {current_size} slice(s) but no `time` " + "coordinate (no backfill source -- wipe + reingest)" + ) + + # Write data + the `time` coordinate at all levels (time is replicated per level so datetime + # `.sel` resolves at any rendered scale, #192). for level_name, (vv_lev, vh_lev, mask_lev) in data_by_level.items(): level = orbit[level_name] h, w = vv_lev.shape @@ -627,12 +719,12 @@ def ingest_s1tiling_acquisition( level["vh"][current_size, :, :] = vh_lev level["border_mask"][current_size, :, :] = mask_lev - # Append coordinate variables - for coord_name in ["time", "absolute_orbit", "relative_orbit", "platform"]: - r10m[coord_name].resize((new_size,)) + level["time"].resize((new_size,)) + level["time"][current_size] = dt_ns - dt_ns = np.datetime64(meta.datetime).astype("datetime64[ns]").astype(np.int64) - r10m["time"][current_size] = dt_ns + # Per-time metadata coordinates at native resolution only. + for coord_name in ["absolute_orbit", "relative_orbit", "platform"]: + r10m[coord_name].resize((new_size,)) r10m["absolute_orbit"][current_size] = meta.absolute_orbit r10m["relative_orbit"][current_size] = meta.relative_orbit r10m["platform"][current_size] = meta.platform @@ -651,12 +743,21 @@ def ingest_s1tiling_acquisition( def consolidate_s1_store(store_path: str | Path, orbit_direction: str) -> None: - """Consolidate metadata at orbit direction and root levels. + """Consolidate metadata for every orbit-direction group and the root. Must be called AFTER all ingestions complete — consolidated metadata caches array shapes and will become stale if called mid-ingestion. + + Consolidates *every* orbit group present, not just ``orbit_direction``: the + pipeline ingests acquisitions one orbit at a time after stripping all + consolidated metadata (so ``time`` can resize), so consolidating only the + passed orbit would leave the other orbit's group unconsolidated on disk + (readers opening that orbit standalone then fall back to a listing). + ``orbit_direction`` is retained for logging / caller compatibility. """ - zarr.consolidate_metadata(str(store_path), path=orbit_direction, zarr_format=3) + root = zarr.open_group(str(store_path), mode="r", zarr_format=3) + for orbit_name, _ in root.groups(): + zarr.consolidate_metadata(str(store_path), path=orbit_name, zarr_format=3) zarr.consolidate_metadata(str(store_path), zarr_format=3) log.info( "Metadata consolidated", @@ -665,6 +766,62 @@ def consolidate_s1_store(store_path: str | Path, orbit_direction: str) -> None: ) +# ============================================================================= +# S3 / local filesystem helpers +# ============================================================================= + + +def _list_tifs(input_dir: str | Path) -> list[str | Path]: + """List *.tif files; supports local paths and s3:// URIs.""" + s = str(input_dir).rstrip("/") + if s.startswith("s3://"): + import s3fs as _s3 + + fs = _s3.S3FileSystem() + bucket_key = s[len("s3://") :] + return [f"s3://{p}" for p in sorted(fs.glob(f"{bucket_key}/*.tif"))] + return sorted(Path(input_dir).glob("*.tif")) + + +def _coerce_input_path(p: str | Path) -> str | Path: + """Return str for s3:// URIs (preserves double-slash); Path otherwise.""" + s = str(p) + return s if s.startswith("s3://") else Path(s) + + +def _input_path_exists(p: str | Path) -> bool: + """Existence check for both local Path and s3:// URI.""" + s = str(p) + if s.startswith("s3://"): + import s3fs as _s3 + + return _s3.S3FileSystem().exists(s.removeprefix("s3://")) + return Path(p).exists() + + +def _rasterio_env(path: str | Path): # type: ignore[return] + """rasterio.Env context for S3 paths; no-op context manager for local paths. + + rasterio 1.5 passes endpoint_url verbatim as GDAL's AWS_S3_ENDPOINT. + GDAL expects hostname only (no scheme), so strip https:// from + AWS_ENDPOINT_URL if that is what the environment provides. + """ + import contextlib + import os + + if not str(path).startswith("s3://"): + return contextlib.nullcontext() + + import boto3 + import rasterio + from rasterio.session import AWSSession + + raw = os.environ.get("AWS_S3_ENDPOINT") or os.environ.get("AWS_ENDPOINT_URL", "") + endpoint = raw.split("://", 1)[-1] if "://" in raw else raw + session = boto3.Session() + return rasterio.Env(AWSSession(session, endpoint_url=endpoint or None)) + + # ============================================================================= # File Discovery # ============================================================================= @@ -689,12 +846,11 @@ def discover_s1tiling_acquisitions(input_dir: str | Path) -> list[dict]: Logs warnings for incomplete acquisitions (missing polarisation or mask files). """ - input_dir = Path(input_dir) - files = sorted(input_dir.glob("*.tif")) + files = _list_tifs(input_dir) groups: dict[tuple, dict] = {} for f in files: - parsed = parse_s1tiling_filename(f.name) + parsed = parse_s1tiling_filename(Path(str(f)).name) if parsed is None: continue @@ -792,8 +948,8 @@ def ingest_s1tiling_conditions( ("incidence_angle", incidence_angle_path), ]: if path is not None: - p = Path(path) - if not p.exists(): + p = _coerce_input_path(path) + if not _input_path_exists(p): raise FileNotFoundError(f"Condition GeoTIFF not found: {p}") condition_inputs.append((label, p)) @@ -817,7 +973,7 @@ def ingest_s1tiling_conditions( # Read reference metadata from the first condition file ref_label, ref_path = condition_inputs[0] - with rasterio.open(str(ref_path)) as src: + with _rasterio_env(ref_path), rasterio.open(str(ref_path)) as src: ref_crs = str(src.crs) t = src.transform ref_transform = [t.a, t.b, t.c, t.d, t.e, t.f] @@ -849,8 +1005,11 @@ def ingest_s1tiling_conditions( for label, cond_path in condition_inputs: array_name = f"{label}_{orbit_suffix}" - with rasterio.open(str(cond_path)) as src: - data = src.read(1).astype(np.float32) + with _rasterio_env(cond_path), rasterio.open(str(cond_path)) as src: + # nodata → NaN via the GeoTIFF's declared nodata (border_mask is N/A for static + # conditions), so out-of-coverage pixels mask transparent like vv/vh. A no-op when + # the GeoTIFF declares no nodata. + data = src.read(1, masked=True).filled(np.nan).astype(np.float32) h, w = data.shape @@ -859,18 +1018,26 @@ def ingest_s1tiling_conditions( conditions[array_name][:, :] = data log.info("Overwrote condition array", array_name=array_name) else: + # Shard like the vv/vh pyramid: one shard over the full (y, x) extent so a 10980² + # condition array is a single object, not ~900 tiny 366²-chunk objects. + # calculate_aligned_chunk_size returns a divisor of the dimension, so (h, w) is a clean + # multiple of the inner chunk — the Zarr v3 shard-divisibility requirement. + inner_chunks = ( + calculate_aligned_chunk_size(h, 512), + calculate_aligned_chunk_size(w, 512), + ) arr = conditions.create_array( array_name, shape=(h, w), dtype="float32", - chunks=( - calculate_aligned_chunk_size(h, 512), - calculate_aligned_chunk_size(w, 512), - ), + chunks=inner_chunks, + shards=(h, w), compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), fill_value=float("nan"), dimension_names=["y", "x"], ) + # CF `_FillValue` so readers mask NaN nodata (xarray #11345), like vv/vh (#172). + arr.attrs["_FillValue"] = FLOAT32_NAN_FILL_VALUE arr[:, :] = data log.info( "Wrote condition array", @@ -880,6 +1047,8 @@ def ingest_s1tiling_conditions( max=float(np.nanmax(data)), ) + _add_grid_mapping(conditions, ref_crs) + log.info( "Conditions ingestion complete", orbit_direction=orbit_direction, @@ -901,12 +1070,11 @@ def discover_s1tiling_conditions(input_dir: str | Path) -> list[dict]: Groups by (tile, orbit). """ - input_dir = Path(input_dir) - files = sorted(input_dir.glob("*.tif")) + files = _list_tifs(input_dir) groups: dict[tuple[str, str], dict] = {} for f in files: - m = S1TILING_GAMMA_AREA_PATTERN.match(f.name) + m = S1TILING_GAMMA_AREA_PATTERN.match(Path(str(f)).name) if m: tile = m.group("tile") orbit = m.group("orbit") @@ -916,7 +1084,7 @@ def discover_s1tiling_conditions(input_dir: str | Path) -> list[dict]: groups[key]["gamma_area"] = f continue - m = S1TILING_LIA_PATTERN.match(f.name) + m = S1TILING_LIA_PATTERN.match(Path(str(f)).name) if m: tile = m.group("tile") orbit = m.group("orbit") diff --git a/src/eopf_geozarr/data_api/geozarr/types.py b/src/eopf_geozarr/data_api/geozarr/types.py index 0dc9c5dc..6f10e213 100644 --- a/src/eopf_geozarr/data_api/geozarr/types.py +++ b/src/eopf_geozarr/data_api/geozarr/types.py @@ -60,6 +60,18 @@ class StandardYCoordAttrsJSON(TypedDict): _ARRAY_DIMENSIONS: list[Literal["y"]] +class S1BackscatterAttrsJSON(TypedDict): + """CF attributes for the Sentinel-1 RTC backscatter bands (vv/vh). + + ``_FillValue`` is the ``FillValueCoder``-encoded base64 NaN that lets xarray mask + nodata under ``use_zarr_fill_value_as_mask`` despite xarray #11345 (data-model #172). + """ + + standard_name: Literal["surface_backwards_scattering_coefficient_of_radar_wave"] + units: Literal["1"] + _FillValue: str + + class TileMatrixJSON(TypedDict): id: str scaleDenominator: float diff --git a/src/eopf_geozarr/stac/__init__.py b/src/eopf_geozarr/stac/__init__.py new file mode 100644 index 00000000..3ce158bd --- /dev/null +++ b/src/eopf_geozarr/stac/__init__.py @@ -0,0 +1 @@ +"""STAC item builders for eopf-geozarr Zarr stores.""" diff --git a/src/eopf_geozarr/stac/s1_rtc.py b/src/eopf_geozarr/stac/s1_rtc.py new file mode 100644 index 00000000..4848eb3e --- /dev/null +++ b/src/eopf_geozarr/stac/s1_rtc.py @@ -0,0 +1,532 @@ +"""STAC item builder for S1 GRD RTC Zarr V3 stores.""" + +from __future__ import annotations + +import datetime as dt +from pathlib import Path +from typing import NamedTuple, cast + +import numpy as np +import pyproj +import pystac +import zarr + +SAR_EXT = "https://stac-extensions.github.io/sar/v1.0.0/schema.json" +SAT_EXT = "https://stac-extensions.github.io/sat/v1.0.0/schema.json" +PROJ_EXT = "https://stac-extensions.github.io/projection/v2.0.0/schema.json" +RENDER_EXT = "https://stac-extensions.github.io/render/v1.0.0/schema.json" +DATACUBE_EXT = "https://stac-extensions.github.io/datacube/v2.2.0/schema.json" +TIMESTAMPS_EXT = "https://stac-extensions.github.io/timestamps/v1.1.0/schema.json" +GRID_EXT = "https://stac-extensions.github.io/grid/v1.1.0/schema.json" + +ZARR_MEDIA_TYPE = "application/vnd.zarr; version=3" + +_ORBIT_PREFERENCE = ("ascending", "descending") +# Short suffix for orbit-keyed asset names (gamma0-rtc-backscatter-asc / -desc). +_ORBIT_SHORT = {"ascending": "asc", "descending": "desc"} + +# γ⁰ RTC backscatter is float32 with a NaN fill at every resolution level; the arrays carry no attrs +# in the store so these product invariants are hardcoded (see the store hierarchy in data_api/s1_rtc). +GAMMA0_DTYPE = "float32" +GAMMA0_NODATA = "nan" +GAMMA0_UNIT = "gamma0 (linear power)" +BORDER_MASK_DTYPE = "uint8" +GSD = 10 + + +def _rgb_render(orbit: str) -> dict[str, object]: + """Build the dual-pol RGB composite render config for the given orbit group. + + Produces a 3-band false-colour composite (R=VV, G=VH, B=VV/VH ratio) that + titiler renders into previews/tiles. ``bidx=[1]`` selects the single time + slice from each multi-band variable. Each band gets its own ``rescale`` pair: + VV and VH are linear gamma0 (low values) while the VV/VH ratio spans ~1-15, so + a single shared pair blew the ratio band out to a flat blue/purple wash (and + dropped low-cross-pol water to transparent, making swaths look mislocated). + Per-band stretches keep the composite natural and the swath readable. + """ + vv = f"/{orbit}:vv" + vh = f"/{orbit}:vh" + return { + "title": "VV, VH, VV/VH composite", + "expression": f"{vv};{vh};({vv})/({vh})", + # Per-band linear stretch: VV/VH are low-valued gamma0; the VV/VH ratio spans ~1-15. + # One shared pair saturated the ratio band (purple wash) and dropped low-cross-pol water. + "rescale": [[0.0, 0.4], [0.0, 0.1], [1.0, 15.0]], + "bidx": [1], + "tilesize": 256, + } + + +def _gamma0_bands() -> list[dict[str, object]]: + """STAC 1.1 band objects for the two polarisations carried by a γ⁰ RTC asset.""" + return [ + { + "name": pol, + "description": f"γ⁰ RTC backscatter, {pol.upper()} polarization", + "data_type": GAMMA0_DTYPE, + "nodata": GAMMA0_NODATA, + "unit": GAMMA0_UNIT, + } + for pol in ("vv", "vh") + ] + + +def _utm_to_wgs84(proj_code: str, utm_bbox: list[float]) -> tuple[float, float, float, float]: + """Convert UTM [xmin, ymin, xmax, ymax] to WGS84 (west, south, east, north).""" + xmin, ymin, xmax, ymax = utm_bbox + transformer = pyproj.Transformer.from_crs(proj_code, "EPSG:4326", always_xy=True) + xs = [xmin, xmax, xmin, xmax] + ys = [ymin, ymin, ymax, ymax] + lons, lats = transformer.transform(xs, ys) + return min(lons), min(lats), max(lons), max(lats) + + +def _bbox_to_geometry(bbox: list[float]) -> dict[str, object]: + """A closed rectangular Polygon for a WGS84 [west, south, east, north] bbox.""" + west, south, east, north = bbox + return { + "type": "Polygon", + "coordinates": [ + [[west, south], [east, south], [east, north], [west, north], [west, south]] + ], + } + + +def _open_root(zarr_store: str) -> zarr.Group: + """Open the cube root, preferring consolidated metadata. + + A cube grown by appending a time-slice to an *existing same-orbit* group can end up without root + consolidated metadata (re-consolidating an append on the S3 store is unreliable). The builder must + not require it — fall back to reading the hierarchy directly, exactly as titiler does. See the + data-model issue on the S1 RTC consolidated-metadata regression. + """ + try: + return zarr.open_consolidated(zarr_store, zarr_format=3) + except ValueError as exc: + if "consolidated metadata" not in str(exc).lower(): + raise + return zarr.open_group(zarr_store, mode="r", zarr_format=3) + + +def build_s1_rtc_stac_item(zarr_store: str, collection_id: str) -> pystac.Item: + """Build a STAC item from a consolidated S1 GRD RTC Zarr V3 store. + + Parameters + ---------- + zarr_store: + Local path or ``s3://`` URI to the Zarr store. + collection_id: + STAC collection ID to attach to the item. + + Returns + ------- + pystac.Item + + Raises + ------ + ValueError + If the store contains no acquisitions. + """ + # TEMPORARY (#246): the store is written as s1-rtc-{tile}.zarr so its filename equals + # the item id, which titiler-eopf reconstructs as the render path (it ignores the asset + # href). Revert this prefix to "s1-grd-rtc-" when titiler-eopf#108 lands. + tile_id = Path(zarr_store).name.removeprefix("s1-rtc-").removesuffix(".zarr") + + root = _open_root(zarr_store) + + all_times_ns: list[int] = [] + wgs84_bboxes: list[tuple[float, float, float, float]] = [] + # Per present orbit, in preference order: the metadata needed for assets, projection and datacube. + present: list[dict[str, object]] = [] + + for orbit_dir in _ORBIT_PREFERENCE: + if orbit_dir not in root: + continue + og = cast("zarr.Group", root[orbit_dir]) + attrs = dict(og.attrs) + proj_code = cast("str", attrs["proj:code"]) + utm_bbox = cast("list[float]", attrs["spatial:bbox"]) + + r10m = cast("zarr.Group", og["r10m"]) + times = np.array(cast("zarr.Array", r10m["time"])).tolist() + if not times: + continue + + # proj:shape / proj:transform live on the r10m group attrs in real stores; read best-effort so + # minimal/legacy stores without them still build (just without those projection refinements). + r10m_attrs = dict(r10m.attrs) + all_times_ns.extend(times) + wgs84_bboxes.append(_utm_to_wgs84(proj_code, utm_bbox)) + present.append( + { + "orbit": orbit_dir, + "proj_code": proj_code, + "utm_bbox": utm_bbox, + "shape": r10m_attrs.get("spatial:shape"), + "transform": r10m_attrs.get("spatial:transform"), + } + ) + + if not all_times_ns: + raise ValueError(f"No acquisitions found in Zarr store: {zarr_store}") + + # Temporal range + start_dt = dt.datetime.fromtimestamp(min(all_times_ns) / 1e9, tz=dt.UTC) + end_dt = dt.datetime.fromtimestamp(max(all_times_ns) / 1e9, tz=dt.UTC) + + # WGS84 bbox union across all present orbit directions + west = min(b[0] for b in wgs84_bboxes) + south = min(b[1] for b in wgs84_bboxes) + east = max(b[2] for b in wgs84_bboxes) + north = max(b[3] for b in wgs84_bboxes) + wgs84_bbox = [west, south, east, north] + + geometry = _bbox_to_geometry(wgs84_bbox) + + # The preferred orbit (ascending if present) drives the single-valued projection fields and the + # default render/preview; every present orbit gets its own first-class asset below. + preferred = present[0] + preferred_orbit = cast("str", preferred["orbit"]) + preferred_proj_code = cast("str", preferred["proj_code"]) + preferred_bbox = cast("list[float]", preferred["utm_bbox"]) + + properties: dict[str, object] = { + "start_datetime": start_dt.isoformat(), + "end_datetime": end_dt.isoformat(), + "title": f"Sentinel-1 GRD RTC γ⁰ — tile {tile_id}", + "description": ( + "Radiometric-terrain-corrected (RTC) γ⁰ backscatter datacube from Sentinel-1 GRD, " + "reprojected onto the Sentinel-2 MGRS/UTM grid." + ), + # `updated` (timestamps extension) tracks this metadata build. `created` is intentionally + # omitted: it means the item's creation instant, which the store does not record — using an + # acquisition time would misuse the field, and a build-time value would churn on every append. + "updated": dt.datetime.now(tz=dt.UTC).isoformat(), + # Identity invariants (constant across the cube; platform is per-acquisition so omitted here — + # a cube can mix S1A and S1C). + "constellation": "sentinel-1", + "instruments": ["c-sar"], + "gsd": GSD, + # SAR extension + "sar:instrument_mode": "IW", + "sar:frequency_band": "C", + "sar:center_frequency": 5.405, + "sar:polarizations": ["VV", "VH"], + "sar:product_type": "GRD", + # Projection extension + "proj:code": preferred_proj_code, + "proj:bbox": preferred_bbox, + # Grid extension: the Sentinel-2 MGRS tile this cube is gridded onto — a queryable tile id + # (enables tile-filtering the acquisitions collection and cube↔acquisition cross-links). + "grid:code": f"MGRS-{tile_id}", + # Render extension: dual-pol RGB composite for previews/tiles (defaults to the preferred orbit) + "renders": {"rgb": _rgb_render(preferred_orbit)}, + } + if preferred["shape"] is not None: + properties["proj:shape"] = preferred["shape"] + if preferred["transform"] is not None: + properties["proj:transform"] = preferred["transform"] + + stac_extensions = [SAR_EXT, PROJ_EXT, RENDER_EXT, DATACUBE_EXT, TIMESTAMPS_EXT, GRID_EXT] + + # sat:orbit_state is single-valued, so it's only meaningful when the cube holds a single orbit. A + # dual-orbit cube would mislabel half its slices — omit it there (per-acquisition items, which are + # single-orbit, carry the real value). Only declare the SAT extension when the field is set. + if len(present) == 1: + properties["sat:orbit_state"] = preferred_orbit + stac_extensions.append(SAT_EXT) + + # Datacube extension. The time axis is irregularly sampled, so it lists its discrete `values` (the + # acquisition instants across all orbit groups, sorted) — their count is the number of time steps, + # and the list stays modest (bounded by the tile's acquisitions). The regular x/y axes instead carry + # extent + step (their element count is derivable, and the exact pixel count is in proj:shape); + # enumerating their ~10⁴ coordinates would not scale. + epsg = pyproj.CRS.from_user_input(preferred_proj_code).to_epsg() + xmin, ymin, xmax, ymax = preferred_bbox + time_values = [ + dt.datetime.fromtimestamp(t / 1e9, tz=dt.UTC).isoformat() for t in sorted(set(all_times_ns)) + ] + time_dim: dict[str, object] = { + "type": "temporal", + "extent": [start_dt.isoformat(), end_dt.isoformat()], + "values": time_values, + } + if len(present) > 1: + # The cube merges two per-orbit sub-cubes (disjoint time axes) onto a shared grid. Orbit is an + # attribute of each acquisition, not an independent axis, so it is conveyed via the per-orbit + # assets rather than a (sparse) orbit dimension — note that here to avoid misreading the axis. + time_dim["description"] = ( + "Acquisition instants across both orbit directions (union); each step belongs to a single " + "orbit — the ascending/descending groups are exposed as separate assets and as items in " + "the per-acquisition collection." + ) + x_dim: dict[str, object] = { + "type": "spatial", + "axis": "x", + "extent": [xmin, xmax], + "reference_system": epsg, + } + y_dim: dict[str, object] = { + "type": "spatial", + "axis": "y", + "extent": [ymin, ymax], + "reference_system": epsg, + } + if preferred["transform"] is not None: + transform = cast("list[float]", preferred["transform"]) + x_dim["step"] = transform[0] + y_dim["step"] = transform[4] + properties["cube:dimensions"] = {"time": time_dim, "x": x_dim, "y": y_dim} + # `variable_type` is the datacube field name (not `type`); the border mask is auxiliary, not data. + properties["cube:variables"] = { + "vv": {"dimensions": ["time", "y", "x"], "variable_type": "data", "unit": GAMMA0_UNIT}, + "vh": {"dimensions": ["time", "y", "x"], "variable_type": "data", "unit": GAMMA0_UNIT}, + "border_mask": {"dimensions": ["time", "y", "x"], "variable_type": "auxiliary"}, + } + + item = pystac.Item( + id=f"s1-rtc-{tile_id}", + geometry=geometry, + bbox=wgs84_bbox, + datetime=None, + properties=properties, + stac_extensions=stac_extensions, + collection=collection_id, + ) + + store_str = str(zarr_store) + item.add_asset( + "zarr-store", + pystac.Asset( + href=store_str, + media_type=ZARR_MEDIA_TYPE, + roles=["data"], + title="Sentinel-1 GRD RTC Zarr store", + ), + ) + + # One γ⁰ asset per present orbit group (fixes the duplicate-href vv/vh ambiguity and the missing + # descending asset): VV/VH are addressable as named `bands`, not indistinguishable duplicate assets. + # A separate border-mask asset exposes the valid-data mask variable in the same group. + for info in present: + orbit = cast("str", info["orbit"]) + short = _ORBIT_SHORT[orbit] + group_href = f"{store_str}/{orbit}" + item.add_asset( + f"gamma0-rtc-backscatter-{short}", + pystac.Asset( + href=group_href, + media_type=ZARR_MEDIA_TYPE, + roles=["data"], + title=f"γ⁰ RTC backscatter ({orbit})", + extra_fields={ + "bands": _gamma0_bands(), + "data_type": GAMMA0_DTYPE, + "nodata": GAMMA0_NODATA, + "unit": GAMMA0_UNIT, + "gsd": GSD, + }, + ), + ) + item.add_asset( + f"border-mask-{short}", + pystac.Asset( + href=group_href, + media_type=ZARR_MEDIA_TYPE, + roles=["data"], + title=f"Valid-data mask ({orbit})", + extra_fields={ + "bands": [ + { + "name": "border_mask", + "description": "Valid-data mask (0 = border/no-data, non-zero = valid)", + "data_type": BORDER_MASK_DTYPE, + "nodata": 0, + } + ], + "gsd": GSD, + }, + ), + ) + + return item + + +# ============================================================================ +# Per-acquisition item construction (one queryable item per cube `time` slice) +# ============================================================================ + +# Default the cube preview to the most recent acquisition covering most of the tile, so a browser shows +# fresh near-full data rather than the oldest slice. +COVERAGE_THRESHOLD = 0.80 + + +class Slice(NamedTuple): + """One cube time slice: its orbit group, acquisition instant, and tile coverage fraction (0..1).""" + + orbit: str + dt: dt.datetime + coverage: float + + +def pick_slice(slices: list[Slice]) -> Slice | None: + """Choose the slice the cube preview should default to. + + The most recent acquisition with coverage strictly above ``COVERAGE_THRESHOLD``; if none clears it, + the highest-coverage slice (ties broken by most recent). Spans both orbit groups. Returns ``None`` + for an empty cube. + """ + if not slices: + return None + good = [s for s in slices if s.coverage > COVERAGE_THRESHOLD] + if good: + return max(good, key=lambda s: s.dt) + return max(slices, key=lambda s: (s.coverage, s.dt)) + + +def slice_coverages(zarr_store: str) -> list[Slice]: + """Per-slice tile coverage from the cube, across both orbit groups. + + Reads ``border_mask`` at the cheap ``r720m`` overview only (~150x150). Coverage is the fraction of + **valid** pixels; the S1Tiling border mask is stored with ``fill_value=0`` for the border, so valid + = non-zero. ``time`` is raw int64 ns (as :func:`build_s1_rtc_stac_item` reads it) -> UTC datetime. + """ + root = _open_root(zarr_store) + out: list[Slice] = [] + for orbit in _ORBIT_PREFERENCE: + if orbit not in root: + continue + level = cast("zarr.Group", cast("zarr.Group", root[orbit])["r720m"]) + mask = np.asarray(cast("zarr.Array", level["border_mask"])) # (time, y, x), uint8 + times_ns = np.asarray(cast("zarr.Array", level["time"])).tolist() # int64 ns since epoch + for i, t_ns in enumerate(times_ns): + sl = mask[i] + coverage = float(np.count_nonzero(sl) / sl.size) + out.append(Slice(orbit, dt.datetime.fromtimestamp(t_ns / 1e9, tz=dt.UTC), coverage)) + return out + + +def acquisition_id(tile_id: str, when: dt.datetime) -> str: + """Per-acquisition item id, e.g. ``s1-rtc-31TCH-20260607t055248``.""" + return f"s1-rtc-{tile_id}-{when.strftime('%Y%m%dt%H%M%S')}" + + +def _normalize_platform(raw: object) -> str | None: + """Map the store's short platform code (e.g. ``s1a``) to the STAC convention (``sentinel-1a``). + + Mirrors the Sentinel-2 reference (``sentinel-2a``). Unknown values are returned lower-cased. + """ + s = str(raw).strip().lower() + if not s: + return None + if len(s) == 3 and s.startswith("s1"): + return f"sentinel-1{s[2]}" + return s + + +def build_s1_rtc_per_acquisition_items( + zarr_store: str, *, orbit: str, collection_id: str +) -> list[pystac.Item]: + """Build one queryable STAC item per cube ``time`` slice of a single orbit group. + + Each item is a single-``datetime`` view into the shared cube (no data duplication): it keeps the + cube's geometry/SAR/projection metadata and the orbit's γ⁰ asset, drops the temporal range and the + datacube structure (a single acquisition is not a cube), and is reoriented to ``orbit``. The item + is deployment-agnostic — it carries the render config + datetime, and the registration layer derives + the TiTiler links (which point at the cube endpoint with ``sel=time={datetime}``) from it. + + Parameters + ---------- + zarr_store: + Local path or ``s3://`` URI to the per-tile cube Zarr store. + orbit: + Orbit group to emit items for (``"ascending"`` or ``"descending"``). + collection_id: + Target (per-acquisition) STAC collection ID. + + Raises + ------ + ValueError + If the store has no acquisitions, or ``orbit`` is not present in the store. + """ + if orbit not in _ORBIT_PREFERENCE: + raise ValueError(f"orbit must be one of {_ORBIT_PREFERENCE}, got {orbit!r}") + + tile_id = Path(zarr_store).name.removeprefix("s1-rtc-").removesuffix(".zarr") + + root = _open_root(zarr_store) + if orbit not in root: + raise ValueError(f"Orbit group {orbit!r} not found in Zarr store: {zarr_store}") + r10m = cast("zarr.Group", cast("zarr.Group", root[orbit])["r10m"]) + times_ns = np.array(cast("zarr.Array", r10m["time"])).tolist() + platforms = np.array(cast("zarr.Array", r10m["platform"])).tolist() + if not times_ns: + raise ValueError(f"No acquisitions found for orbit {orbit!r} in: {zarr_store}") + + base = build_s1_rtc_stac_item(zarr_store, collection_id) + base_dict = base.to_dict(include_self_link=False) + + # Assets to drop from each per-acquisition clone: the *other* orbit's groups (a per-acq item + # represents one orbit). The datacube structure is dropped too — a single acquisition is not a cube. + other_assets = { + key + for o in _ORBIT_PREFERENCE + if o != orbit + for key in (f"gamma0-rtc-backscatter-{_ORBIT_SHORT[o]}", f"border-mask-{_ORBIT_SHORT[o]}") + } + + # A per-acquisition item covers only its run orbit's footprint — not the cube's union of both + # orbits' extents (which the base item carries). Recompute bbox/geometry/proj:bbox from this orbit. + # proj:code/shape/transform describe the shared MGRS grid (identical across orbits), so the values + # inherited from the base (preferred orbit) are correct and are intentionally not recomputed here. + og_attrs = dict(cast("zarr.Group", root[orbit]).attrs) + orbit_utm_bbox = cast("list[float]", og_attrs["spatial:bbox"]) + orbit_wgs84_bbox = list(_utm_to_wgs84(cast("str", og_attrs["proj:code"]), orbit_utm_bbox)) + orbit_geometry = _bbox_to_geometry(orbit_wgs84_bbox) + + items: list[pystac.Item] = [] + for t_ns, platform in zip(times_ns, platforms, strict=True): + when = dt.datetime.fromtimestamp(t_ns / 1e9, tz=dt.UTC) + item_dict = {**base_dict} + item_dict["id"] = acquisition_id(tile_id, when) + item_dict["bbox"] = orbit_wgs84_bbox + item_dict["geometry"] = orbit_geometry + + props = { + k: v + for k, v in base_dict["properties"].items() + if k not in ("start_datetime", "end_datetime", "cube:dimensions", "cube:variables") + } + props["datetime"] = when.isoformat() + props["sat:orbit_state"] = orbit + props["proj:bbox"] = orbit_utm_bbox + # Per-acquisition title carries the datetime + orbit so sibling scenes are distinguishable + # (the inherited cube title "… — tile {id}" is identical across all acquisitions). + props["title"] = ( + f"Sentinel-1 GRD RTC γ⁰ — tile {tile_id}, " + f"{when.strftime('%Y-%m-%dT%H:%M:%SZ')} ({orbit})" + ) + props["description"] = ( + "Radiometric-terrain-corrected (RTC) γ⁰ backscatter from a single Sentinel-1 GRD " + "acquisition, reprojected onto the Sentinel-2 MGRS/UTM grid." + ) + normalized = _normalize_platform(platform) + if normalized: + props["platform"] = normalized + props["renders"] = {"rgb": _rgb_render(orbit)} + item_dict["properties"] = props + + # Drop the datacube ext (a single acquisition is not a cube). Ensure the SAT ext is declared: + # a per-acq item always sets sat:orbit_state, but a dual-orbit cube base omits both. + exts = [e for e in base_dict.get("stac_extensions", []) if e != DATACUBE_EXT] + if SAT_EXT not in exts: + exts.append(SAT_EXT) + item_dict["stac_extensions"] = exts + item_dict["assets"] = { + k: v for k, v in base_dict["assets"].items() if k not in other_assets + } + item_dict["links"] = [] + + items.append(pystac.Item.from_dict(item_dict)) + return items diff --git a/tests/test_s1_rtc_ingest.py b/tests/test_s1_rtc_ingest.py index c5e4b538..ef660459 100644 --- a/tests/test_s1_rtc_ingest.py +++ b/tests/test_s1_rtc_ingest.py @@ -2,8 +2,10 @@ from __future__ import annotations +import os from math import ceil from pathlib import Path +from unittest.mock import patch import numpy as np import pytest @@ -25,6 +27,7 @@ ingest_s1tiling_conditions, parse_s1tiling_filename, ) +from eopf_geozarr.conversion.utils import calculate_aligned_chunk_size # ============================================================================= # Constants @@ -65,8 +68,9 @@ def _create_synthetic_geotiff( crs: str = CRS, transform: rasterio.transform.Affine | None = None, tags: dict[str, str] | None = None, + nodata: float | None = None, ) -> None: - """Write a single-band GeoTIFF with optional metadata tags.""" + """Write a single-band GeoTIFF with optional metadata tags and declared nodata.""" if transform is None: transform = TRANSFORM with rasterio.open( @@ -79,6 +83,7 @@ def _create_synthetic_geotiff( dtype=data.dtype, crs=crs, transform=transform, + nodata=nodata, ) as dst: if tags: dst.update_tags(**tags) @@ -243,6 +248,66 @@ def test_array_metadata(self, s1_store_path: Path, sample_metadata: S1TilingMeta assert r10m["vv"].dtype == np.float32 assert r10m["border_mask"].dtype == np.uint8 + def test_float_bands_declare_cf_fill_value( + self, s1_store_path: Path, sample_metadata: S1TilingMetadata + ) -> None: + """Float backscatter bands must declare a CF ``_FillValue`` attribute at + *every* multiscale level, matching S2 (data-model #172 / xarray #11345). + + The zarr-level ``fill_value`` alone is not surfaced by xarray's encoding, + so ``to_masked_array()`` / ``use_zarr_fill_value_as_mask=True`` cannot mask + NaN nodata without the attribute. ``test_array_attrs`` only guards the S2 + ``/measurements/`` layout, so the S1 RTC layout was previously unchecked. + """ + from xarray.backends.zarr import FillValueCoder + + root = create_s1_store(s1_store_path, "ascending", sample_metadata) + orbit = root["ascending"] + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + for level_name, _, _ in OVERVIEW_CHAIN: + for band in ("vv", "vh"): + attrs = dict(orbit[level_name][band].attrs) + assert attrs.get("_FillValue") == expected, ( + f"{level_name}/{band} missing/!= CF _FillValue" + ) + assert ( + attrs.get("standard_name") + == "surface_backwards_scattering_coefficient_of_radar_wave" + ) + assert attrs.get("units") == "1" + + def test_no_tile_matrix_set( + self, s1_store_path: Path, sample_metadata: S1TilingMetadata + ) -> None: + # tile_matrix_set is not part of the S1 GRD RTC data model (confirmed with the + # data-model owner): the multiscales attribute must not carry one. + root = create_s1_store(s1_store_path, "ascending", sample_metadata) + ms = dict(root["ascending"].attrs)["multiscales"] + assert "tile_matrix_set" not in ms + assert "layout" in ms + + def test_cf_grid_mapping_resolves_crs( + self, s1_store_path: Path, sample_metadata: S1TilingMetadata + ) -> None: + # Each resolution level carries a CF spatial_ref grid-mapping so rioxarray (and + # TiTiler's GeoZarr reader) can resolve the CRS -- the geozarr proj:code attr + # alone is not read by rioxarray. + import rioxarray # noqa: F401 -- registers the .rio accessor + + create_s1_store(s1_store_path, "ascending", sample_metadata) + r10m = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3)["ascending"]["r10m"] + assert "spatial_ref" in list(r10m.array_keys()) + assert dict(r10m["vv"].attrs).get("grid_mapping") == "spatial_ref" + assert dict(r10m["vh"].attrs).get("grid_mapping") == "spatial_ref" + + ds = xr.open_zarr( + str(s1_store_path / "ascending" / "r10m"), + consolidated=False, + decode_coords="all", + ) + assert ds.rio.crs is not None + assert ds.rio.crs.to_epsg() == 32633 + def test_coordinate_variables( self, s1_store_path: Path, sample_metadata: S1TilingMetadata ) -> None: @@ -319,6 +384,125 @@ def test_second_acquisition_appends(self, s1_geotiff_dir: Path, s1_store_path: P root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) assert root["ascending"]["r10m"]["vv"].shape[0] == 2 + def test_ingested_bands_declare_cf_fill_value( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """End-to-end (production path): vv/vh carry CF ``_FillValue``/``standard_name``/ + ``units`` at every level for BOTH the store-creating orbit (``create_s1_store``) + and a second orbit added via the inline new-orbit path — the two paths that were + previously inconsistent. Parity with S2 / S1 GRD (#172; xarray #11345). + """ + from xarray.backends.zarr import FillValueCoder + + vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") # create_s1_store + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "descending") # inline new orbit + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for orbit in ("ascending", "descending"): + for level_name, _, _ in OVERVIEW_CHAIN: + for band in ("vv", "vh"): + attrs = dict(root[orbit][level_name][band].attrs) + assert attrs.get("_FillValue") == expected, f"{orbit}/{level_name}/{band}" + assert ( + attrs.get("standard_name") + == "surface_backwards_scattering_coefficient_of_radar_wave" + ) + assert attrs.get("units") == "1" + + def test_new_orbit_level_groups_carry_proj_code( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """A second orbit added to an existing store must get the same per-level metadata + as the store-creating orbit — incl. ``proj:code`` on every level group. The inline + new-orbit path previously omitted it (drift vs ``create_s1_store``).""" + vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "descending") + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for orbit in ("ascending", "descending"): + for level_name, _, _ in OVERVIEW_CHAIN: + attrs = dict(root[orbit][level_name].attrs) + assert attrs.get("proj:code") == CRS, f"{orbit}/{level_name} missing proj:code" + + def test_fill_value_masking_roundtrip(self, tmp_path: Path, s1_store_path: Path) -> None: + """End-to-end: out-of-swath nodata (``border_mask == 0``) comes back masked when the cube + is reopened with ``use_zarr_fill_value_as_mask=True`` — the behaviour the CF ``_FillValue`` + attribute exists to enable despite xarray #11345. Mirrors the S2 guarantee in + ``tests/test_array_attrs.py::test_fill_value_masking_roundtrip``. + + The nodata region comes from ``border_mask``, not a pre-seeded NaN: s1tiling stores ``0.0`` + out of swath, and the writer is what must convert that to NaN. + """ + stamp = "20230115t061234" + rng = np.random.default_rng(0) + vv_data = rng.uniform(0.1, 1.0, (SIZE, SIZE)).astype(np.float32) + vh_data = rng.uniform(0.1, 0.5, (SIZE, SIZE)).astype(np.float32) + mask_data = np.ones((SIZE, SIZE), dtype=np.uint8) + mask_data[0:16, 0:16] = 0 # out-of-swath border + vv_data[0:16, 0:16] = 0.0 # s1tiling stores 0 where there is no swath + vh_data[0:16, 0:16] = 0.0 + vv = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif" + vh = tmp_path / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif" + mask = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC_BorderMask.tif" + _create_synthetic_geotiff(vv, vv_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(vh, vh_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(mask, mask_data, tags=ACQ1_TAGS) + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + + ds = xr.open_dataset( + str(s1_store_path / "ascending" / "r10m"), + engine="zarr", + consolidated=False, + decode_times=False, + decode_coords=False, + use_zarr_fill_value_as_mask=True, + ) + try: + masked = ds["vv"].to_masked_array() + assert np.ma.is_masked(masked), "out-of-swath nodata must be masked via `_FillValue`" + assert masked.mask[0, 0, 0], "nodata cell (border_mask==0) must be masked" + assert not masked.mask[0, -1, -1], "valid cell must not be masked" + finally: + ds.close() + + def test_nodata_masked_to_nan(self, tmp_path: Path, s1_store_path: Path) -> None: + """The writer stores NaN — not 0 — wherever ``border_mask == 0``, at the native level and + every overview, so titiler masks out-of-swath nodata transparent like the S2 reference. + + NaN must coincide exactly with ``border_mask == 0``: valid pixels stay finite. Root cause + of the "black area" render bug: s1tiling writes 0 out of swath, and 0 is valid data to + titiler. ``np.nanmean`` downsampling must carry the NaN to every overview level. + """ + stamp = "20230115t061234" + rng = np.random.default_rng(7) + vv_data = rng.uniform(0.1, 1.0, (SIZE, SIZE)).astype(np.float32) + vh_data = rng.uniform(0.1, 0.5, (SIZE, SIZE)).astype(np.float32) + mask_data = np.ones((SIZE, SIZE), dtype=np.uint8) + mask_data[0:32, :] = 0 # out-of-swath border band (whole rows) + vv_data[0:32, :] = 0.0 # s1tiling stores 0 there — valid data to titiler today + vh_data[0:32, :] = 0.0 + vv = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif" + vh = tmp_path / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif" + mask = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC_BorderMask.tif" + _create_synthetic_geotiff(vv, vv_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(vh, vh_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(mask, mask_data, tags=ACQ1_TAGS) + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + nodata = mask_data == 0 + for band in ("vv", "vh"): + native = root["ascending"]["r10m"][band][0, :, :] + assert np.all(np.isnan(native[nodata])), f"{band}: nodata region must be NaN, not 0" + assert not np.any(np.isnan(native[~nodata])), f"{band}: valid region must stay finite" + assert not np.any(native[nodata] == 0.0), f"{band}: nodata must not read back as 0" + + # NaN propagates through np.nanmean downsampling: the all-nodata top band stays NaN. + coarse = root["ascending"]["r20m"]["vv"][0, :, :] + assert np.isnan(coarse[0, 0]), "all-nodata block must stay NaN at the overview level" + assert not np.any(np.isnan(coarse[-1, :])), "fully-valid bottom row must stay finite" + def test_preserves_data_integrity(self, s1_geotiff_dir: Path, s1_store_path: Path) -> None: vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") @@ -326,13 +510,18 @@ def test_preserves_data_integrity(self, s1_geotiff_dir: Path, s1_store_path: Pat # Read back and compare with rasterio.open(str(vv)) as src: expected_vv = src.read(1) + with rasterio.open(str(mask)) as src: + expected_mask = src.read(1).astype(np.uint8) root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) actual_vv = root["ascending"]["r10m"]["vv"][0, :, :] - np.testing.assert_allclose(actual_vv, expected_vv, rtol=1e-6) - # Mask should be exact - with rasterio.open(str(mask)) as src: - expected_mask = src.read(1).astype(np.uint8) + # Valid pixels (border_mask == 1) are preserved exactly; out-of-swath pixels + # (border_mask == 0) are written as NaN, not the raw 0 — the render-bug fix. + valid = expected_mask == 1 + np.testing.assert_allclose(actual_vv[valid], expected_vv[valid], rtol=1e-6) + assert np.all(np.isnan(actual_vv[~valid])), "out-of-swath nodata must be NaN" + + # border_mask itself is stored verbatim (uint8, never masked). actual_mask = root["ascending"]["r10m"]["border_mask"][0, :, :] np.testing.assert_array_equal(actual_mask, expected_mask) @@ -463,6 +652,26 @@ def test_consolidate_after_all_ingestions( r10m = root["ascending"]["r10m"] assert r10m["vv"].shape[0] == 2 + def test_consolidate_all_orbits_present( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """``consolidate_s1_store`` must leave EVERY orbit group consolidated on disk, not just the + one passed. The pipeline ingests acquisitions one orbit at a time after stripping all + consolidated metadata (so ``time`` can resize), so consolidating only the passed orbit left + staging cubes asc✓/desc✗. Each orbit group is checked **standalone**: a consolidated root + synthesises the child's view, so ``root[orbit].metadata.consolidated_metadata`` is a + false-green (non-None even when ``/zarr.json`` lacks it). + """ + vv1, vh1, mask1 = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") + vv2, vh2, mask2 = self._get_acq_paths(s1_geotiff_dir, "20230127t061235") + ingest_s1tiling_acquisition(vv1, vh1, mask1, s1_store_path, "ascending") + ingest_s1tiling_acquisition(vv2, vh2, mask2, s1_store_path, "descending") + consolidate_s1_store(s1_store_path, "descending") # only one orbit passed + + for orbit in ("ascending", "descending"): + grp = zarr.open_group(str(s1_store_path / orbit), mode="r", zarr_format=3) + assert grp.metadata.consolidated_metadata is not None, f"{orbit} orbit not consolidated" + def _get_acq_paths(self, geotiff_dir: Path, stamp: str) -> tuple[Path, Path, Path]: vv = geotiff_dir / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif" vh = geotiff_dir / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif" @@ -504,6 +713,20 @@ def test_skips_non_matching(self, tmp_path: Path) -> None: acqs = discover_s1tiling_acquisitions(tmp_path) assert len(acqs) == 0 + def test_s3_uri_discovers_acquisitions(self) -> None: + """s3:// prefix is listed via s3fs; pathlib.glob is NOT used.""" + s3_files = [ + "bucket/prefix/s1a_32TQM_vv_ASC_037_20230115t061234_GammaNaughtRTC.tif", + "bucket/prefix/s1a_32TQM_vh_ASC_037_20230115t061234_GammaNaughtRTC.tif", + "bucket/prefix/s1a_32TQM_vv_ASC_037_20230115t061234_GammaNaughtRTC_BorderMask.tif", + "bucket/prefix/s1a_32TQM_vh_ASC_037_20230115t061234_GammaNaughtRTC_BorderMask.tif", + ] + with patch("s3fs.S3FileSystem.glob", return_value=s3_files): + acqs = discover_s1tiling_acquisitions("s3://bucket/prefix/") + assert len(acqs) == 1 + expected_vv = "s3://bucket/prefix/s1a_32TQM_vv_ASC_037_20230115t061234_GammaNaughtRTC.tif" + assert acqs[0]["vv"] == expected_vv + def test_resolves_masked_multiframe_stamp_from_tag(self, tmp_path: Path) -> None: """Multi-frame products whose filename time is masked (…txxxxxx) must still be discovered as a complete acquisition, with acq_stamp resolved from the GeoTIFF ACQUISITION_DATETIME @@ -594,6 +817,9 @@ def test_conditions_group_attributes( assert attrs["spatial:dimensions"] == ["y", "x"] assert len(attrs["spatial:transform"]) == 6 assert attrs["spatial:shape"] == [SIZE, SIZE] + # CF grid-mapping so rioxarray can resolve the CRS of the condition arrays + assert "spatial_ref" in list(conditions.array_keys()) + assert dict(conditions["gamma_area_037"].attrs).get("grid_mapping") == "spatial_ref" def test_gamma_area_array_shape_and_dtype( self, s1_store_with_acquisition: Path, gamma_area_geotiff: Path @@ -627,6 +853,103 @@ def test_data_integrity_roundtrip( actual = root["ascending"]["conditions"]["gamma_area_037"][:] np.testing.assert_allclose(actual, expected, rtol=1e-6) + def test_conditions_nodata_masked_to_nan( + self, s1_store_with_acquisition: Path, tmp_path: Path + ) -> None: + """A condition GeoTIFF's declared-nodata pixels read back as NaN (not the raw sentinel), + so the auxiliary arrays mask transparent like vv/vh. border_mask is N/A for static + conditions, so the writer relies on the GeoTIFF's declared nodata via a masked read. + """ + data = np.full((SIZE, SIZE), 1.5, dtype=np.float32) + data[0:20, 0:20] = 0.0 # declared-nodata region + cond_path = tmp_path / "GAMMA_AREA_32TQM_037.tif" + _create_synthetic_geotiff(cond_path, data, nodata=0.0) + ingest_s1tiling_conditions( + store_path=s1_store_with_acquisition, + orbit_direction="ascending", + relative_orbit=37, + gamma_area_path=cond_path, + ) + conditions = zarr.open_group(str(s1_store_with_acquisition), mode="r", zarr_format=3)[ + "ascending" + ]["conditions"] + arr = conditions["gamma_area_037"][:] + assert np.all(np.isnan(arr[0:20, 0:20])), "declared-nodata region must be NaN" + assert not np.any(np.isnan(arr[20:, 20:])), "valid region must stay finite" + + def test_float_conditions_declare_cf_fill_value( + self, + s1_store_with_acquisition: Path, + gamma_area_geotiff: Path, + lia_geotiff: Path, + ) -> None: + """Float condition arrays (gamma_area, lia) must declare a CF ``_FillValue`` so + readers mask NaN nodata (xarray #11345), like vv/vh (#172).""" + from xarray.backends.zarr import FillValueCoder + + ingest_s1tiling_conditions( + store_path=s1_store_with_acquisition, + orbit_direction="ascending", + relative_orbit=37, + gamma_area_path=gamma_area_geotiff, + lia_path=lia_geotiff, + ) + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + conditions = zarr.open_group(str(s1_store_with_acquisition), mode="r", zarr_format=3)[ + "ascending" + ]["conditions"] + for arr_name in ("gamma_area_037", "lia_037"): + assert dict(conditions[arr_name].attrs).get("_FillValue") == expected, arr_name + + def test_gamma_area_is_sharded( + self, s1_store_with_acquisition: Path, gamma_area_geotiff: Path + ) -> None: + """The condition array carries a sharding codec: one shard over the full (y, x) extent, + 512-aligned inner chunks (the same layout vv/vh already use).""" + ingest_s1tiling_conditions( + store_path=s1_store_with_acquisition, + orbit_direction="ascending", + relative_orbit=37, + gamma_area_path=gamma_area_geotiff, + ) + arr = zarr.open_group(str(s1_store_with_acquisition), mode="r", zarr_format=3)[ + "ascending" + ]["conditions"]["gamma_area_037"] + # shards == full extent (None would mean unsharded — the pre-fix layout) + assert arr.shards == (SIZE, SIZE) + assert arr.chunks == (calculate_aligned_chunk_size(SIZE, 512),) * 2 + + def test_sharding_collapses_chunk_objects_to_one(self, s1_store_with_acquisition: Path) -> None: + """A multi-chunk condition array lands as a SINGLE on-disk shard object, not one object per + inner chunk — the object-count collapse (real gamma_area: ~900 chunk objects → 1 shard).""" + # 1098 sq with a 366 sq inner chunk = 3x3 = 9 inner chunks that, sharded, share one shard. + big = 1098 + rng = np.random.default_rng(7) + data = rng.uniform(0.5, 2.0, (big, big)).astype(np.float32) + gpath = s1_store_with_acquisition.parent / "GAMMA_AREA_BIG_037.tif" + _create_synthetic_geotiff( + gpath, data, transform=from_bounds(XMIN, YMIN, XMAX, YMAX, big, big) + ) + ingest_s1tiling_conditions( + store_path=s1_store_with_acquisition, + orbit_direction="ascending", + relative_orbit=37, + gamma_area_path=gpath, + ) + arr = zarr.open_group(str(s1_store_with_acquisition), mode="r", zarr_format=3)[ + "ascending" + ]["conditions"]["gamma_area_037"] + assert arr.chunks == (366, 366) + assert arr.shards == (big, big) + # exactly one chunk-data object on disk (the shard), regardless of the 9 inner chunks + array_dir = s1_store_with_acquisition / "ascending" / "conditions" / "gamma_area_037" + data_objects = [ + f for _r, _d, files in os.walk(array_dir) for f in files if f != "zarr.json" + ] + assert len(data_objects) == 1, data_objects + # values still byte-identical through the shard + np.testing.assert_allclose(arr[:], data, rtol=1e-6) + def test_multiple_conditions( self, s1_store_with_acquisition: Path, gamma_area_geotiff: Path, lia_geotiff: Path ) -> None: @@ -788,3 +1111,202 @@ def test_skips_non_matching(self, tmp_path: Path) -> None: def test_empty_directory(self, tmp_path: Path) -> None: conditions = discover_s1tiling_conditions(tmp_path) assert len(conditions) == 0 + + def test_s3_uri_discovers_conditions(self) -> None: + """s3:// prefix is listed via s3fs; pathlib.glob is NOT used.""" + s3_files = ["bucket/prefix/GAMMA_AREA_32TQM_037.tif"] + with patch("s3fs.S3FileSystem.glob", return_value=s3_files): + conditions = discover_s1tiling_conditions("s3://bucket/prefix/") + assert len(conditions) == 1 + assert conditions[0]["tile"] == "32TQM" + assert conditions[0]["orbit"] == "037" + + +# ============================================================================= +# CF datetime `time` coordinate — render-by-datetime support (data-model #192) +# ============================================================================= + +_LEVELS = ["r10m", "r20m", "r60m", "r120m", "r360m", "r720m"] + + +class TestTimeCFDatetime: + """`time` is CF-encoded at every multiscale level so readers decode it to datetime64 and can + select a slice by datetime (`sel=time={datetime}`) at any rendered scale, even on a non-monotonic + axis. This replaces the fragile positional `sel=time={index}` rendering (#192).""" + + def _paths(self, geotiff_dir: Path, stamp: str) -> tuple[Path, Path, Path]: + vv = geotiff_dir / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif" + vh = geotiff_dir / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif" + mask = geotiff_dir / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC_BorderMask.tif" + return vv, vh, mask + + def test_time_has_cf_attrs_at_every_level( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + vv, vh, mask = self._paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for level in _LEVELS: + attrs = dict(root["ascending"][level]["time"].attrs) + assert attrs.get("units") == "nanoseconds since 1970-01-01", level + assert attrs.get("calendar") == "proleptic_gregorian", level + assert attrs.get("standard_name") == "time", level + + def test_open_datatree_decodes_time_to_datetime64( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + vv, vh, mask = self._paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + dt = xr.open_datatree( + str(s1_store_path), engine="zarr", decode_times=True, consolidated=False + ) + for level in ("r10m", "r720m"): + da = dt["ascending"][level]["vv"] + assert "time" in da.coords, level + assert np.issubdtype(da["time"].dtype, np.datetime64), level + + def test_exact_datetime_sel_on_nonmonotonic_axis( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """Ingest LATER acq first then EARLIER → non-monotonic axis; exact datetime `.sel` still + returns the right physical slice at both the native and a coarse level (the 31TEH case).""" + vv2, vh2, mask2 = self._paths(s1_geotiff_dir, "20230127t061235") # 2023-01-27 (later) + vv1, vh1, mask1 = self._paths(s1_geotiff_dir, "20230115t061234") # 2023-01-15 (earlier) + ingest_s1tiling_acquisition(vv2, vh2, mask2, s1_store_path, "ascending") # -> index 0 + ingest_s1tiling_acquisition(vv1, vh1, mask1, s1_store_path, "ascending") # -> index 1 + + dt = xr.open_datatree( + str(s1_store_path), engine="zarr", decode_times=True, consolidated=False + ) + times = dt["ascending"]["r10m"]["time"].values + assert times[0] > times[1], "axis should be non-monotonic (later acq appended first)" + + early = np.datetime64("2023-01-15T06:12:34") # physical index 1 + later = np.datetime64("2023-01-27T06:12:35") # physical index 0 + for level in ("r10m", "r720m"): + vvda = dt["ascending"][level]["vv"] + np.testing.assert_array_equal( + vvda.sel(time=early).values, vvda.isel(time=1).values, err_msg=f"{level} early" + ) + np.testing.assert_array_equal( + vvda.sel(time=later).values, vvda.isel(time=0).values, err_msg=f"{level} later" + ) + + def test_time_values_identical_across_levels( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + vv1, vh1, mask1 = self._paths(s1_geotiff_dir, "20230115t061234") + vv2, vh2, mask2 = self._paths(s1_geotiff_dir, "20230127t061235") + ingest_s1tiling_acquisition(vv1, vh1, mask1, s1_store_path, "ascending") + ingest_s1tiling_acquisition(vv2, vh2, mask2, s1_store_path, "ascending") + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + ref = np.asarray(root["ascending"]["r10m"]["time"][:]) + assert ref.shape == (2,) + for level in _LEVELS[1:]: + np.testing.assert_array_equal(np.asarray(root["ascending"][level]["time"][:]), ref) + + def test_r10m_time_still_int64_for_register( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """register_per_acquisition reads r10m/time as raw int64 ns — CF attrs must not change that.""" + vv, vh, mask = self._paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + arr = root["ascending"]["r10m"]["time"] + assert arr.dtype == np.dtype("int64") + assert str(np.datetime64(int(arr[0]), "ns")).startswith("2023-01-15") + + +# ============================================================================= +# Per-level `time` self-heal on append (robust to a pre-#192 / half-built cube) +# ============================================================================= + + +class TestPerLevelTimeHeal: + """The append recreates a multiscale level's missing `time` from r10m/time instead of raising + `KeyError: 'time'` (a cube built before #192, or left half-built by an interrupted append), and + refuses to mis-heal a genuinely inconsistent cube.""" + + def _paths(self, d: Path, stamp: str) -> tuple[Path, Path, Path]: + return ( + d / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif", + d / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif", + d / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC_BorderMask.tif", + ) + + def _coarse_levels(self, store_path: Path) -> list[str]: + root = zarr.open_group(str(store_path), mode="r", zarr_format=3) + return [n for n, _ in root["ascending"].groups() if n not in ("r10m", "conditions")] + + def _drop_time(self, store_path: Path, level: str) -> None: + """Simulate a pre-#192 cube by removing a level's `time` array. `ingest_s1tiling_acquisition` + does not consolidate, so a filesystem removal is enough for the group to no longer see it.""" + import shutil + + shutil.rmtree(Path(store_path) / "ascending" / level / "time") + + def test_append_heals_levels_missing_time( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """A coarser level lacking `time` is recreated from r10m/time (prior slices preserved); the + append that previously crashed now succeeds.""" + a1 = self._paths(s1_geotiff_dir, "20230115t061234") + a2 = self._paths(s1_geotiff_dir, "20230127t061235") + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") + coarse = self._coarse_levels(s1_store_path) + assert coarse # sanity: there ARE coarser levels + for lvl in coarse: + self._drop_time(s1_store_path, lvl) + + idx = ingest_s1tiling_acquisition(*a2, s1_store_path, "ascending") # was KeyError: 'time' + + assert idx == 1 + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + ref = list(root["ascending"]["r10m"]["time"][:]) + assert len(ref) == 2 + for lvl in coarse: + t = root["ascending"][lvl]["time"] + assert t.dtype == np.dtype("int64") + assert list(t[:]) == ref # backfilled prior slice + appended new slice, matching r10m + + def test_append_noop_when_all_levels_have_time( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """A healthy cube: the heal is a no-op and the append works normally.""" + a1 = self._paths(s1_geotiff_dir, "20230115t061234") + a2 = self._paths(s1_geotiff_dir, "20230127t061235") + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") + idx = ingest_s1tiling_acquisition(*a2, s1_store_path, "ascending") + assert idx == 1 + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for lvl in self._coarse_levels(s1_store_path): + assert root["ascending"][lvl]["time"].shape[0] == 2 + + def test_append_raises_on_half_built_cube( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """A level whose data length disagrees with r10m/time (and lacks `time`) is unhealable -> raise + rather than write a wrong-length coordinate.""" + a1 = self._paths(s1_geotiff_dir, "20230115t061234") + a2 = self._paths(s1_geotiff_dir, "20230127t061235") + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") + ingest_s1tiling_acquisition(*a2, s1_store_path, "ascending") # 2 slices + + root = zarr.open_group(str(s1_store_path), mode="r+", zarr_format=3) + r20m = root["ascending"]["r20m"] + _, h, w = r20m["vv"].shape + r20m["vv"].resize((1, h, w)) # half-built: r20m has 1 slice, r10m has 2 + self._drop_time(s1_store_path, "r20m") + + with pytest.raises(ValueError, match="half-built"): + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") + + def test_append_raises_when_r10m_time_missing( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """r10m holds slices but no `time` -> no backfill source -> raise (not a silent invention).""" + a1 = self._paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") + self._drop_time(s1_store_path, "r10m") + with pytest.raises(ValueError, match="no backfill source"): + ingest_s1tiling_acquisition(*a1, s1_store_path, "ascending") diff --git a/tests/test_s1_stac.py b/tests/test_s1_stac.py new file mode 100644 index 00000000..99abf6dd --- /dev/null +++ b/tests/test_s1_stac.py @@ -0,0 +1,355 @@ +"""Tests for build_s1_rtc_stac_item — STAC item builder for S1 GRD RTC Zarr stores.""" + +from __future__ import annotations + +import datetime as dt +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from pathlib import Path +import pytest +import zarr + +from eopf_geozarr.stac.s1_rtc import build_s1_rtc_stac_item + +# ============================================================================= +# Constants +# ============================================================================= + +CRS = "EPSG:32631" +UTM_BBOX = [300000.0, 4900000.0, 400000.0, 5000000.0] # [xmin, ymin, xmax, ymax] + +# Nanoseconds since epoch for two acquisitions +T1_NS = int(np.datetime64("2023-01-15T06:12:34", "ns").astype(np.int64)) +T2_NS = int(np.datetime64("2023-01-27T06:12:35", "ns").astype(np.int64)) + + +# ============================================================================= +# Fixture helper +# ============================================================================= + + +def _make_s1_store( + tmp_path: Path, + orbits: dict[str, list[tuple[int, str]]], + tile_id: str = "31TCH", + crs: str = CRS, + utm_bbox: list[float] | None = None, + consolidate: bool = True, +) -> Path: + """Create a minimal S1 Zarr store. + + ``orbits`` maps orbit_direction -> list of (time_ns, platform) tuples. + Creates only the attrs and coordinate arrays needed by build_s1_rtc_stac_item. + ``consolidate=False`` skips writing root consolidated metadata, mirroring a cube that grew by + appending a time-slice to an existing same-orbit group (the builder must still read it). + """ + if utm_bbox is None: + utm_bbox = UTM_BBOX + # TEMPORARY (#246): store basename == item id (s1-rtc-{tile}) so titiler's reconstructed + # render path resolves; revert to "s1-grd-rtc-" when titiler-eopf#108 lands. + store_path = tmp_path / f"s1-rtc-{tile_id}.zarr" + root = zarr.open_group(str(store_path), mode="w", zarr_format=3) + ny = nx = 4 # tiny spatial grid: enough for the builder's metadata reads + for orbit_dir, acquisitions in orbits.items(): + og = root.create_group(orbit_dir) + og.attrs.update({"proj:code": crs, "spatial:bbox": utm_bbox}) + r10m = og.create_group("r10m") + # proj:shape / proj:transform live on the r10m group attrs in real stores. + r10m.attrs.update( + { + "spatial:shape": [ny, nx], + "spatial:transform": [10.0, 0.0, utm_bbox[0], 0.0, -10.0, utm_bbox[3]], + } + ) + times = np.array([t for t, _ in acquisitions], dtype="int64") + platforms = np.array([p for _, p in acquisitions], dtype=" None: + """Item id must be s1-rtc-{tile_id} derived from the store basename.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + assert item.id == "s1-rtc-31TCH" + + +def test_grid_code_and_extension(tmp_path: Path) -> None: + """The cube declares the grid extension + grid:code = MGRS-{tile} (a queryable tile id).""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + assert item.properties["grid:code"] == "MGRS-31TCH" + assert "https://stac-extensions.github.io/grid/v1.1.0/schema.json" in item.stac_extensions + + +def test_builds_from_non_consolidated_store(tmp_path: Path) -> None: + """Regression: the builder must read a store that lacks root consolidated metadata. + + A per-tile cube grown by appending a time-slice to an existing same-orbit group can end up + without root consolidated metadata (re-consolidating an S3 append is unreliable), which made + ``zarr.open_consolidated`` raise ``ValueError: Consolidated metadata ... not found`` and broke + STAC registration in the live S1 RTC pipeline. The builder must fall back to a direct read. + """ + store = _make_s1_store( + tmp_path, {"ascending": [(T1_NS, "S1A"), (T2_NS, "S1A")]}, consolidate=False + ) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + assert item.id == "s1-rtc-31TCH" + assert item.properties["start_datetime"] + assert item.properties["end_datetime"] + + +def test_temporal_range_min_max(tmp_path: Path) -> None: + """start_datetime/end_datetime must span the full time range across all acquisitions.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A"), (T2_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + start = dt.datetime.fromisoformat(item.properties["start_datetime"]) + end = dt.datetime.fromisoformat(item.properties["end_datetime"]) + + expected_start = dt.datetime(2023, 1, 15, 6, 12, 34, tzinfo=dt.UTC) + expected_end = dt.datetime(2023, 1, 27, 6, 12, 35, tzinfo=dt.UTC) + + assert abs((start - expected_start).total_seconds()) < 1 + assert abs((end - expected_end).total_seconds()) < 1 + assert item.datetime is None + + +def test_bbox_wgs84_from_utm(tmp_path: Path) -> None: + """UTM bbox must be converted to WGS84 and stored as item bbox.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + west, south, east, north = item.bbox # type: ignore[misc] + # EPSG:32631 [300000,4900000,400000,5000000] -> approx 0.46E-1.75E, 44.2N-45.1N + assert 0.0 < west < 1.0 + assert 44.0 < south < 45.0 + assert 1.0 < east < 2.0 + assert 45.0 < north < 46.0 + + +def test_both_orbits_bbox_union(tmp_path: Path) -> None: + """When ascending and descending are both present, the WGS84 bbox is the union.""" + # Give ascending a different UTM bbox (shifted east) + store_path = tmp_path / "s1-rtc-31TCH.zarr" + root = zarr.open_group(str(store_path), mode="w", zarr_format=3) + + for orbit_dir, bbox in [ + ("descending", [300000.0, 4900000.0, 400000.0, 5000000.0]), + ("ascending", [400000.0, 4900000.0, 500000.0, 5000000.0]), + ]: + og = root.create_group(orbit_dir) + og.attrs.update({"proj:code": CRS, "spatial:bbox": bbox}) + r10m = og.create_group("r10m") + t_arr = r10m.create_array("time", shape=(1,), dtype="int64", chunks=(512,)) + t_arr[:] = [T1_NS] + p_arr = r10m.create_array("platform", shape=(1,), dtype=" 2.5 # right edge from ascending (shifted ~1° further east) + + +def test_both_orbits_get_first_class_assets(tmp_path: Path) -> None: + """A dual-orbit cube must expose a γ⁰ asset per orbit group (bug #2), each pointing at its group.""" + store = _make_s1_store( + tmp_path, {"descending": [(T1_NS, "S1A")], "ascending": [(T1_NS, "S1A")]} + ) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + asc = item.assets["gamma0-rtc-backscatter-asc"] + desc = item.assets["gamma0-rtc-backscatter-desc"] + assert asc.href.endswith("/ascending") + assert desc.href.endswith("/descending") + # The dual-pol href ambiguity (bug #1) is gone: VV/VH are named bands, not duplicate assets. + assert [b["name"] for b in asc.extra_fields["bands"]] == ["vv", "vh"] + assert "border-mask-asc" in item.assets + assert "border-mask-desc" in item.assets + + +def test_empty_store_raises(tmp_path: Path) -> None: + """A store with an orbit group but no acquisitions must raise ValueError.""" + store_path = tmp_path / "s1-rtc-31TCH.zarr" + root = zarr.open_group(str(store_path), mode="w", zarr_format=3) + og = root.create_group("descending") + og.attrs.update({"proj:code": CRS, "spatial:bbox": UTM_BBOX}) + r10m = og.create_group("r10m") + t_arr = r10m.create_array("time", shape=(0,), dtype="int64", chunks=(512,)) + p_arr = r10m.create_array("platform", shape=(0,), dtype=" None: + """zarr-store href = store URI; the γ⁰ asset href = {store}/{orbit} (orbit group, per geozarr spec).""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + store_str = str(store) + assert item.assets["zarr-store"].href == store_str + # Single-orbit cube → only the descending γ⁰/mask assets exist (no ascending). + assert item.assets["gamma0-rtc-backscatter-desc"].href == f"{store_str}/descending" + assert item.assets["border-mask-desc"].href == f"{store_str}/descending" + assert "gamma0-rtc-backscatter-asc" not in item.assets + + +def test_gamma0_asset_band_and_invariant_metadata(tmp_path: Path) -> None: + """The γ⁰ asset carries VV/VH bands + data_type/nodata/unit/gsd invariants.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + asset = item.assets["gamma0-rtc-backscatter-desc"] + assert asset.extra_fields["data_type"] == "float32" + assert asset.extra_fields["nodata"] == "nan" + assert asset.extra_fields["gsd"] == 10 + bands = asset.extra_fields["bands"] + assert {b["name"] for b in bands} == {"vv", "vh"} + assert all(b["data_type"] == "float32" for b in bands) + + +def test_identity_and_projection_fields(tmp_path: Path) -> None: + """Item-level identity invariants + projection detail are present (S2-parity gaps).""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + props = item.properties + assert props["constellation"] == "sentinel-1" + assert props["instruments"] == ["c-sar"] + assert props["gsd"] == 10 + assert "platform" not in props # per-acquisition; a cube can mix S1A/S1C + assert props["proj:bbox"] == UTM_BBOX + assert props["proj:shape"] == [4, 4] + assert props["proj:transform"][0] == 10.0 + + +def test_datacube_extension(tmp_path: Path) -> None: + """Cube items carry the datacube extension. The irregular time axis lists its discrete `values` + (count = number of acquisitions); the regular x/y axes carry extent + step only (no per-pixel + enumeration — the exact pixel count is in proj:shape).""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A"), (T2_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + assert "https://stac-extensions.github.io/datacube/v2.2.0/schema.json" in item.stac_extensions + dims = item.properties["cube:dimensions"] + assert dims["time"]["type"] == "temporal" + assert len(dims["time"]["extent"]) == 2 + assert dims["time"]["values"] == [ # two distinct acquisitions -> two time steps, sorted + dt.datetime.fromtimestamp(T1_NS / 1e9, tz=dt.UTC).isoformat(), + dt.datetime.fromtimestamp(T2_NS / 1e9, tz=dt.UTC).isoformat(), + ] + assert dims["x"]["reference_system"] == 32631 + assert dims["x"]["step"] == 10.0 + assert dims["y"]["step"] == -10.0 + assert "values" not in dims["x"] # regular axis: extent + step, not ~10^4 coordinates + assert item.properties["proj:shape"] == [4, 4] # exact x/y element count + variables = item.properties["cube:variables"] + assert set(variables) == {"vv", "vh", "border_mask"} + assert variables["vv"]["dimensions"] == ["time", "y", "x"] + # datacube field is `variable_type` (not `type`); the border mask is auxiliary, not data. + assert variables["vv"]["variable_type"] == "data" + assert variables["border_mask"]["variable_type"] == "auxiliary" + assert "type" not in variables["vv"] + + +def test_orbit_state_single_vs_dual(tmp_path: Path) -> None: + """sat:orbit_state (single-valued) is set only for a single-orbit cube; a dual-orbit cube omits it + (and the SAT extension) rather than mislabel half its slices.""" + single = _make_s1_store(tmp_path / "a", {"descending": [(T1_NS, "S1A")]}) + item1 = build_s1_rtc_stac_item(str(single), "sentinel-1-grd-rtc-staging") + assert item1.properties["sat:orbit_state"] == "descending" + assert "https://stac-extensions.github.io/sat/v1.0.0/schema.json" in item1.stac_extensions + assert "description" not in item1.properties["cube:dimensions"]["time"] # single orbit: no note + + dual = _make_s1_store( + tmp_path / "b", {"ascending": [(T1_NS, "S1A")], "descending": [(T1_NS, "S1A")]} + ) + item2 = build_s1_rtc_stac_item(str(dual), "sentinel-1-grd-rtc-staging") + assert "sat:orbit_state" not in item2.properties + assert "https://stac-extensions.github.io/sat/v1.0.0/schema.json" not in item2.stac_extensions + # dual-orbit cube notes that the merged time axis spans both orbits (orbit is an asset-level split) + assert "orbit" in item2.properties["cube:dimensions"]["time"]["description"].lower() + + +def test_timestamps_updated_only(tmp_path: Path) -> None: + """`updated` (build time) is set; `created` is omitted — the store records no item-creation time, + so an acquisition time would misuse the field and a build-time value would churn on every append.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A"), (T2_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + assert "created" not in item.properties + assert dt.datetime.fromisoformat(item.properties["updated"]) + + +def test_sar_extension_fields(tmp_path: Path) -> None: + """SAR extension fields must be set with correct values for S1 IW GRD.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + props = item.properties + assert props["sar:instrument_mode"] == "IW" + assert props["sar:frequency_band"] == "C" + assert props["sar:center_frequency"] == pytest.approx(5.405) + assert props["sar:polarizations"] == ["VV", "VH"] + assert props["sar:product_type"] == "GRD" + + sar_ext_uri = "https://stac-extensions.github.io/sar/v1.0.0/schema.json" + assert sar_ext_uri in item.stac_extensions + + +def test_render_extension_rgb_composite(tmp_path: Path) -> None: + """Item must declare a render-extension RGB composite using the preferred orbit.""" + store = _make_s1_store(tmp_path, {"descending": [(T1_NS, "S1A")]}) + item = build_s1_rtc_stac_item(str(store), "sentinel-1-grd-rtc-staging") + + render_ext_uri = "https://stac-extensions.github.io/render/v1.0.0/schema.json" + assert render_ext_uri in item.stac_extensions + + rgb = item.properties["renders"]["rgb"] + assert rgb["expression"] == "/descending:vv;/descending:vh;(/descending:vv)/(/descending:vh)" + assert rgb["rescale"] == [[0.0, 0.4], [0.0, 0.1], [1.0, 15.0]] + assert rgb["bidx"] == [1] + assert rgb["tilesize"] == 256 + + +def test_render_uses_ascending_when_preferred(tmp_path: Path) -> None: + """When ascending is the preferred orbit, the render expression must reference it.""" + store_path = tmp_path / "s1-rtc-31TCH.zarr" + root = zarr.open_group(str(store_path), mode="w", zarr_format=3) + for orbit_dir in ("descending", "ascending"): + og = root.create_group(orbit_dir) + og.attrs.update({"proj:code": CRS, "spatial:bbox": UTM_BBOX}) + r10m = og.create_group("r10m") + t_arr = r10m.create_array("time", shape=(1,), dtype="int64", chunks=(512,)) + t_arr[:] = [T1_NS] + p_arr = r10m.create_array("platform", shape=(1,), dtype=" Slice: + return Slice(orbit=orbit, dt=dt.datetime(2026, 6, day, 6, 0, tzinfo=dt.UTC), coverage=coverage) + + +def test_most_recent_above_threshold_wins_even_if_older_has_more_coverage() -> None: + chosen = pick_slice( + [_s("ascending", 4, 0.99), _s("descending", 7, 0.85), _s("ascending", 6, 0.90)] + ) + assert chosen is not None + assert (chosen.orbit, chosen.dt.day, chosen.coverage) == ("descending", 7, 0.85) + + +def test_falls_back_to_max_coverage_when_none_above_threshold() -> None: + chosen = pick_slice( + [_s("ascending", 7, 0.40), _s("descending", 5, 0.75), _s("ascending", 4, 0.50)] + ) + assert chosen is not None + assert (chosen.orbit, chosen.dt.day, chosen.coverage) == ("descending", 5, 0.75) + + +def test_exact_80_percent_is_not_above_threshold() -> None: + # 0.80 is NOT > 0.80 -> excluded from the "good" set; the 0.81 older slice wins instead. + chosen = pick_slice([_s("ascending", 7, 0.80), _s("descending", 4, 0.81)]) + assert chosen is not None + assert (chosen.dt.day, chosen.coverage) == (4, 0.81) + + +def test_max_coverage_tie_broken_by_most_recent() -> None: + chosen = pick_slice( + [_s("ascending", 4, 0.60), _s("descending", 8, 0.60), _s("ascending", 6, 0.60)] + ) + assert chosen is not None + assert chosen.dt.day == 8 + + +def test_single_slice_returned() -> None: + only = _s("ascending", 5, 0.10) + assert pick_slice([only]) == only + + +def test_empty_returns_none() -> None: + assert pick_slice([]) is None + + +# ============================================================================= +# slice_coverages — per-slice tile coverage from the cube's r720m border_mask +# ============================================================================= + + +def _ns(day: int) -> int: + return int(dt.datetime(2026, 6, day, 6, 0, tzinfo=dt.UTC).timestamp() * 1e9) + + +def _write_r720m(root: zarr.Group, orbit: str, masks: list[np.ndarray], days: list[int]) -> None: + """Write {orbit}/r720m with a border_mask (time,y,x) + time (int64 ns), like the real cube.""" + lvl = root.create_group(orbit).create_group("r720m") + n = len(masks) + y, x = masks[0].shape + bm = lvl.create_array("border_mask", shape=(n, y, x), dtype="uint8", fill_value=0) + bm[:] = np.stack(masks).astype("uint8") + t = lvl.create_array("time", shape=(n,), dtype="int64") + t[:] = np.array([_ns(d) for d in days], dtype="int64") + + +def _make_coverage_cube(tmp_path: Path) -> str: + store = str(tmp_path / "s1-rtc-31TCH.zarr") + root = zarr.open_group(store, mode="w", zarr_format=3) + full = np.ones((4, 4)) # coverage 1.0 + half = np.zeros((4, 4)) + half[:2, :] = 1 # coverage 0.5 + quarter = np.zeros((4, 4)) + quarter[0, :] = 1 # coverage 0.25 + _write_r720m(root, "ascending", [full, half], [4, 6]) + _write_r720m(root, "descending", [quarter], [5]) + return store + + +def test_slice_coverages_reads_both_orbits_with_correct_fractions(tmp_path: Path) -> None: + by_key = { + (s.orbit, s.dt.day): s.coverage for s in slice_coverages(_make_coverage_cube(tmp_path)) + } + assert by_key == { + ("ascending", 4): 1.0, # full mask -> polarity check (non-zero = valid) + ("ascending", 6): 0.5, + ("descending", 5): 0.25, + } + + +def test_slice_coverages_times_are_utc_datetimes(tmp_path: Path) -> None: + s = next(s for s in slice_coverages(_make_coverage_cube(tmp_path)) if s.orbit == "descending") + assert s.dt == dt.datetime(2026, 6, 5, 6, 0, tzinfo=dt.UTC) + + +def test_slice_coverages_skips_missing_orbit(tmp_path: Path) -> None: + store = str(tmp_path / "s1-rtc-asc-only.zarr") + root = zarr.open_group(store, mode="w", zarr_format=3) + _write_r720m(root, "ascending", [np.ones((4, 4))], [7]) + assert {s.orbit for s in slice_coverages(store)} == {"ascending"} + + +# ============================================================================= +# build_s1_rtc_per_acquisition_items — one item per cube time slice +# ============================================================================= + + +def _make_acq_cube( + tmp_path: Path, orbits: dict[str, list[tuple[int, str]]], tile_id: str = "31TCH" +) -> str: + """A cube with r10m time/platform (+ tiny data arrays) per orbit — enough to build per-acq items.""" + store = str(tmp_path / f"s1-rtc-{tile_id}.zarr") + root = zarr.open_group(store, mode="w", zarr_format=3) + ny = nx = 4 + for orbit, acqs in orbits.items(): + og = root.create_group(orbit) + og.attrs.update({"proj:code": CRS, "spatial:bbox": UTM_BBOX}) + r10m = og.create_group("r10m") + r10m.attrs.update( + { + "spatial:shape": [ny, nx], + "spatial:transform": [10.0, 0.0, UTM_BBOX[0], 0.0, -10.0, UTM_BBOX[3]], + } + ) + times = np.array([t for t, _ in acqs], dtype="int64") + platforms = np.array([p for _, p in acqs], dtype=" None: + when = dt.datetime(2026, 6, 7, 5, 52, 48, tzinfo=dt.UTC) + assert acquisition_id("31TCH", when) == "s1-rtc-31TCH-20260607t055248" + + +def test_per_acq_title_carries_datetime_and_orbit(tmp_path: Path) -> None: + """Per-acq titles must distinguish sibling scenes (not inherit the cube's '— tile {id}' title).""" + store = _make_acq_cube(tmp_path, {"descending": [(T_EARLY, "s1a")]}) + item = build_s1_rtc_per_acquisition_items(store, orbit="descending", collection_id="acq")[0] + title = item.properties["title"] + assert "2026-06-05T06:09:07Z" in title + assert "descending" in title + assert title != "Sentinel-1 GRD RTC γ⁰ — tile 31TCH" + + +def test_single_datetime_no_range_and_targets_collection(tmp_path: Path) -> None: + store = _make_acq_cube(tmp_path, {"descending": [(T_EARLY, "S1A")]}) + item = build_s1_rtc_per_acquisition_items( + store, orbit="descending", collection_id="sentinel-1-grd-rtc-acquisitions" + )[0] + props = item.properties + assert item.collection_id == "sentinel-1-grd-rtc-acquisitions" + assert props["datetime"] == "2026-06-05T06:09:07+00:00" + assert "start_datetime" not in props + assert "end_datetime" not in props + assert props["sar:product_type"] == "GRD" + + +def test_reorients_orbit_metadata_and_keeps_only_run_orbit_asset(tmp_path: Path) -> None: + """A descending run carries /descending metadata + only the descending γ⁰/mask assets — never the + cube's preferred /ascending.""" + store = _make_acq_cube( + tmp_path, {"ascending": [(T_EARLY, "S1A")], "descending": [(T_EARLY, "S1A")]} + ) + item = build_s1_rtc_per_acquisition_items(store, orbit="descending", collection_id="acq")[0] + assert item.properties["sat:orbit_state"] == "descending" + # SAT ext must be declared even though the dual-orbit cube base omitted it. + assert "https://stac-extensions.github.io/sat/v1.0.0/schema.json" in item.stac_extensions + assert item.properties["renders"]["rgb"]["expression"].startswith("/descending:vv") + assert "gamma0-rtc-backscatter-desc" in item.assets + assert "gamma0-rtc-backscatter-asc" not in item.assets + assert "border-mask-asc" not in item.assets + assert "ascending" not in json.dumps(item.to_dict(include_self_link=False)) + + +def test_per_slice_platform_and_no_datacube(tmp_path: Path) -> None: + """Each item gets its own slice's platform (normalized to the STAC convention); a single + acquisition is not a datacube, and `created` is not set.""" + store = _make_acq_cube(tmp_path, {"descending": [(T_LATER, "s1a"), (T_EARLY, "s1c")]}) + items = build_s1_rtc_per_acquisition_items(store, orbit="descending", collection_id="acq") + assert [i.properties["platform"] for i in items] == ["sentinel-1a", "sentinel-1c"] + for item in items: + assert "cube:dimensions" not in item.properties + assert DATACUBE_EXT not in item.stac_extensions + assert "created" not in item.properties + + +def test_grid_code_inherited_from_cube(tmp_path: Path) -> None: + """Per-acquisition items inherit the cube's grid extension + grid:code (the MGRS tile).""" + store = _make_acq_cube(tmp_path, {"descending": [(T_EARLY, "S1A")]}) + item = build_s1_rtc_per_acquisition_items(store, orbit="descending", collection_id="acq")[0] + assert item.properties["grid:code"] == "MGRS-31TCH" + assert "https://stac-extensions.github.io/grid/v1.1.0/schema.json" in item.stac_extensions + + +def test_footprint_is_run_orbit_not_cube_union(tmp_path: Path) -> None: + """A per-acq item must carry ITS orbit's footprint, not the cube's union of both orbits' extents.""" + # ascending shifted east so the cube union bbox is wider than the descending orbit alone. + store = str(tmp_path / "s1-rtc-31TCH.zarr") + root = zarr.open_group(store, mode="w", zarr_format=3) + ny = nx = 4 + desc_bbox = [300000.0, 4900000.0, 400000.0, 5000000.0] + asc_bbox = [400000.0, 4900000.0, 500000.0, 5000000.0] + for orbit, bbox in (("descending", desc_bbox), ("ascending", asc_bbox)): + og = root.create_group(orbit) + og.attrs.update({"proj:code": CRS, "spatial:bbox": bbox}) + r10m = og.create_group("r10m") + r10m.attrs.update( + { + "spatial:shape": [ny, nx], + "spatial:transform": [10.0, 0.0, bbox[0], 0.0, -10.0, bbox[3]], + } + ) + r10m.create_array("time", shape=(1,), dtype="int64", chunks=(512,))[:] = [T_EARLY] + r10m.create_array("platform", shape=(1,), dtype=" None: + store = _make_acq_cube(tmp_path, {"descending": [(T_EARLY, "S1A")]}) + with pytest.raises(ValueError, match="orbit must be one of"): + build_s1_rtc_per_acquisition_items(store, orbit="sideways", collection_id="acq") + + +def test_orbit_absent_from_store_raises(tmp_path: Path) -> None: + store = _make_acq_cube(tmp_path, {"descending": [(T_EARLY, "S1A")]}) + with pytest.raises(ValueError, match="not found"): + build_s1_rtc_per_acquisition_items(store, orbit="ascending", collection_id="acq") + + +def test_datetime_follows_slice_not_physical_position(tmp_path: Path) -> None: + """Items are emitted in physical (append) order, each carrying its OWN datetime — so a + non-monotonic cube still yields a correct item per slice.""" + store = _make_acq_cube(tmp_path, {"descending": [(T_LATER, "S1A"), (T_EARLY, "S1A")]}) + items = build_s1_rtc_per_acquisition_items(store, orbit="descending", collection_id="acq") + assert items[0].properties["datetime"] == "2026-06-07T05:52:48+00:00" + assert items[1].properties["datetime"] == "2026-06-05T06:09:07+00:00" + assert items[0].id == "s1-rtc-31TCH-20260607t055248" diff --git a/uv.lock b/uv.lock index 6f1926ed..9c174678 100644 --- a/uv.lock +++ b/uv.lock @@ -831,7 +831,7 @@ wheels = [ [[package]] name = "eopf-geozarr" -version = "0.10.0" +version = "0.10.1" source = { editable = "." } dependencies = [ { name = "aiohttp" }, @@ -842,6 +842,7 @@ dependencies = [ { name = "pydantic" }, { name = "pydantic-zarr" }, { name = "pyproj" }, + { name = "pystac" }, { name = "rioxarray" }, { name = "s3fs" }, { name = "structlog" }, @@ -888,6 +889,7 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.12" }, { name = "pydantic-zarr", specifier = ">=0.8.0" }, { name = "pyproj", specifier = ">=3.7.0" }, + { name = "pystac", specifier = ">=1.8.0" }, { name = "rioxarray", specifier = ">=0.13.0" }, { name = "s3fs", specifier = ">=2024.6.0" }, { name = "structlog", specifier = ">=25.5.0" },