diff --git a/src/eopf_geozarr/conversion/s1_ingest.py b/src/eopf_geozarr/conversion/s1_ingest.py index 77b6b0cd..f8175fbb 100644 --- a/src/eopf_geozarr/conversion/s1_ingest.py +++ b/src/eopf_geozarr/conversion/s1_ingest.py @@ -19,6 +19,7 @@ from dataclasses import dataclass from math import ceil from pathlib import Path +from typing import TYPE_CHECKING, cast import numpy as np import rasterio @@ -26,12 +27,20 @@ import zarr import zarr.codecs from pyproj import CRS as PyprojCRS + +# `FillValueCoder` is xarray's internal CF fill-value encoder: the canonical way to produce +# the base64 `_FillValue` xarray reads back under `use_zarr_fill_value_as_mask` (xarray +# #11345); the S2 path relies on the same mechanism. Internal API — revisit if xarray moves it. +from xarray.backends.zarr import FillValueCoder from zarr_cm import geo_proj from zarr_cm import multiscales as multiscales_cm from zarr_cm import spatial as spatial_cm from eopf_geozarr.conversion.utils import calculate_aligned_chunk_size +if TYPE_CHECKING: + from eopf_geozarr.data_api.geozarr.types import S1BackscatterAttrsJSON + log = structlog.get_logger() # ============================================================================= @@ -308,6 +317,21 @@ def _create_spatial_coordinate_arrays( "standard_name": "time", } +# CF `_FillValue` for the float32 arrays, mirroring the S1 GRD converter (geozarr.py) +# and S2 (data-model #172). This is what lets xarray mask NaN nodata via +# `use_zarr_fill_value_as_mask=True` despite xarray #11345 — the zarr-level `fill_value` +# field alone is not surfaced through xarray's encoding. We encode it with +# `FillValueCoder` so the stored attribute matches the base64 form S2 writes +# (`AAAAAAAA+H8=`). The S2 path gets this for free via `to_zarr`; this store is written +# zarr-direct, so the attribute must be set explicitly. +FLOAT32_NAN_FILL_VALUE = FillValueCoder.encode(np.nan, np.dtype("float32")) +# CF metadata for the backscatter bands (vv/vh). +BACKSCATTER_CF_ATTRS: S1BackscatterAttrsJSON = { + "standard_name": "surface_backwards_scattering_coefficient_of_radar_wave", + "units": "1", + "_FillValue": FLOAT32_NAN_FILL_VALUE, +} + def _create_time_coordinate_array(level_group: zarr.Group) -> None: """Create the CF-encoded ``time`` coordinate (length 0, grown on append) on one level group. @@ -358,20 +382,55 @@ def _add_grid_mapping(group: zarr.Group, crs_string: str) -> None: arr.attrs["grid_mapping"] = "spatial_ref" -def create_s1_store( - store_path: str | Path, - orbit_direction: str, - metadata: S1TilingMetadata, +def _create_band_arrays(level_group: zarr.Group, level_h: int, level_w: int) -> None: + """Create the (time, y, x) data bands (vv, vh, border_mask) for one multiscale level. + + Float backscatter bands (vv/vh) get the CF ``_FillValue``/``standard_name``/``units`` + attributes so readers can mask NaN nodata (xarray #11345) — parity with the S1 GRD + converter and S2 (data-model #172). Shared by ``create_s1_store`` (new store) and + ``ingest_s1tiling_acquisition`` (new orbit added to an existing store) so the two + creation paths cannot drift. + """ + inner_chunks = ( + 1, + calculate_aligned_chunk_size(level_h, 512), + calculate_aligned_chunk_size(level_w, 512), + ) + shard_shape = (1, level_h, level_w) + for name, dtype, fill in [ + ("vv", "float32", float("nan")), + ("vh", "float32", float("nan")), + ("border_mask", "uint8", 0), + ]: + band = level_group.create_array( + name, + shape=(0, level_h, level_w), + dtype=dtype, + chunks=inner_chunks, + shards=shard_shape, + compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), + fill_value=fill, + dimension_names=["time", "y", "x"], + ) + if name in ("vv", "vh"): + # cast: zarr's `attrs.update` is typed for its JSON union, which a TypedDict + # (invariant) doesn't satisfy structurally; the values are JSON-safe. + band.attrs.update(cast("dict", BACKSCATTER_CF_ATTRS)) + + +def _build_orbit_group( + root: zarr.Group, orbit_direction: str, metadata: S1TilingMetadata ) -> zarr.Group: - """Create a new S1 GRD RTC Zarr V3 store with full conventions metadata. + """Create one orbit group (asc/desc) with full GeoZarr metadata. - Returns the root group. + Builds the multiscale level groups (bands + spatial coordinates + grid-mapping + + per-level ``time``) and the native-resolution per-acquisition metadata coordinates. + Shared by ``create_s1_store`` (new store) and ``ingest_s1tiling_acquisition`` (new + orbit added to an existing store) so the two creation paths cannot drift — the inline + path previously omitted ``proj:code`` on level groups. """ layout = compute_multiscales_layout(metadata.shape, metadata.spatial_transform) - - root = zarr.open_group(str(store_path), mode="w-", zarr_format=3) orbit_group = root.create_group(orbit_direction) - orbit_group.attrs.update( { "zarr_conventions": ZARR_CONVENTIONS, @@ -398,28 +457,7 @@ def create_s1_store( } ) - inner_chunks = ( - 1, - calculate_aligned_chunk_size(level_h, 512), - calculate_aligned_chunk_size(level_w, 512), - ) - shard_shape = (1, level_h, level_w) - - for name, dtype, fill in [ - ("vv", "float32", float("nan")), - ("vh", "float32", float("nan")), - ("border_mask", "uint8", 0), - ]: - level_group.create_array( - name, - shape=(0, level_h, level_w), - dtype=dtype, - chunks=inner_chunks, - shards=shard_shape, - compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), - fill_value=fill, - dimension_names=["time", "y", "x"], - ) + _create_band_arrays(level_group, level_h, level_w) _create_spatial_coordinate_arrays( level_group, level_h, level_w, level_entry["spatial:transform"] @@ -450,6 +488,20 @@ def create_s1_store( fill_value="", dimension_names=["time"], ) + return orbit_group + + +def create_s1_store( + store_path: str | Path, + orbit_direction: str, + metadata: S1TilingMetadata, +) -> zarr.Group: + """Create a new S1 GRD RTC Zarr V3 store with full conventions metadata. + + Returns the root group. + """ + root = zarr.open_group(str(store_path), mode="w-", zarr_format=3) + _build_orbit_group(root, orbit_direction, metadata) log.info( "Created S1 store", @@ -559,79 +611,9 @@ def ingest_s1tiling_acquisition( else: root = zarr.open_group(str(store_path), mode="r+", zarr_format=3) if orbit_direction not in root: - # Create orbit direction group in existing store - orbit_group = root.create_group(orbit_direction) - layout = compute_multiscales_layout(meta.shape, meta.spatial_transform) - orbit_group.attrs.update( - { - "zarr_conventions": ZARR_CONVENTIONS, - "multiscales": { - "layout": layout, - "resampling_method": "average", - }, - "proj:code": meta.crs, - "spatial:dimensions": ["y", "x"], - "spatial:bbox": meta.bounds, - } - ) - for level_entry in layout: - level_name = level_entry["asset"] - level_h, level_w = level_entry["spatial:shape"] - level_group = orbit_group.create_group(level_name) - level_group.attrs.update( - { - "spatial:shape": [level_h, level_w], - "spatial:transform": level_entry["spatial:transform"], - } - ) - inner_chunks = ( - 1, - calculate_aligned_chunk_size(level_h, 512), - calculate_aligned_chunk_size(level_w, 512), - ) - shard_shape = (1, level_h, level_w) - for name, dtype, fill in [ - ("vv", "float32", float("nan")), - ("vh", "float32", float("nan")), - ("border_mask", "uint8", 0), - ]: - level_group.create_array( - name, - shape=(0, level_h, level_w), - dtype=dtype, - chunks=inner_chunks, - shards=shard_shape, - compressors=zarr.codecs.BloscCodec(cname="zstd", clevel=5), - fill_value=fill, - dimension_names=["time", "y", "x"], - ) - _create_spatial_coordinate_arrays( - level_group, level_h, level_w, level_entry["spatial:transform"] - ) - _add_grid_mapping(level_group, meta.crs) - # `time` coordinate on every level (datetime `.sel` at any scale, #192). - _create_time_coordinate_array(level_group) - r10m = orbit_group["r10m"] - for name, dtype, fill in [ - ("absolute_orbit", "int32", 0), - ("relative_orbit", "int32", 0), - ]: - r10m.create_array( - name, - shape=(0,), - dtype=dtype, - chunks=(512,), - fill_value=fill, - dimension_names=["time"], - ) - r10m.create_array( - "platform", - shape=(0,), - dtype=" None: + """Float backscatter bands must declare a CF ``_FillValue`` attribute at + *every* multiscale level, matching S2 (data-model #172 / xarray #11345). + + The zarr-level ``fill_value`` alone is not surfaced by xarray's encoding, + so ``to_masked_array()`` / ``use_zarr_fill_value_as_mask=True`` cannot mask + NaN nodata without the attribute. ``test_array_attrs`` only guards the S2 + ``/measurements/`` layout, so the S1 RTC layout was previously unchecked. + """ + from xarray.backends.zarr import FillValueCoder + + root = create_s1_store(s1_store_path, "ascending", sample_metadata) + orbit = root["ascending"] + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + for level_name, _, _ in OVERVIEW_CHAIN: + for band in ("vv", "vh"): + attrs = dict(orbit[level_name][band].attrs) + assert attrs.get("_FillValue") == expected, ( + f"{level_name}/{band} missing/!= CF _FillValue" + ) + assert ( + attrs.get("standard_name") + == "surface_backwards_scattering_coefficient_of_radar_wave" + ) + assert attrs.get("units") == "1" + def test_no_tile_matrix_set( self, s1_store_path: Path, sample_metadata: S1TilingMetadata ) -> None: @@ -354,6 +382,83 @@ def test_second_acquisition_appends(self, s1_geotiff_dir: Path, s1_store_path: P root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) assert root["ascending"]["r10m"]["vv"].shape[0] == 2 + def test_ingested_bands_declare_cf_fill_value( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """End-to-end (production path): vv/vh carry CF ``_FillValue``/``standard_name``/ + ``units`` at every level for BOTH the store-creating orbit (``create_s1_store``) + and a second orbit added via the inline new-orbit path — the two paths that were + previously inconsistent. Parity with S2 / S1 GRD (#172; xarray #11345). + """ + from xarray.backends.zarr import FillValueCoder + + vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") # create_s1_store + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "descending") # inline new orbit + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for orbit in ("ascending", "descending"): + for level_name, _, _ in OVERVIEW_CHAIN: + for band in ("vv", "vh"): + attrs = dict(root[orbit][level_name][band].attrs) + assert attrs.get("_FillValue") == expected, f"{orbit}/{level_name}/{band}" + assert ( + attrs.get("standard_name") + == "surface_backwards_scattering_coefficient_of_radar_wave" + ) + assert attrs.get("units") == "1" + + def test_new_orbit_level_groups_carry_proj_code( + self, s1_geotiff_dir: Path, s1_store_path: Path + ) -> None: + """A second orbit added to an existing store must get the same per-level metadata + as the store-creating orbit — incl. ``proj:code`` on every level group. The inline + new-orbit path previously omitted it (drift vs ``create_s1_store``).""" + vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "descending") + root = zarr.open_group(str(s1_store_path), mode="r", zarr_format=3) + for orbit in ("ascending", "descending"): + for level_name, _, _ in OVERVIEW_CHAIN: + attrs = dict(root[orbit][level_name].attrs) + assert attrs.get("proj:code") == CRS, f"{orbit}/{level_name} missing proj:code" + + def test_fill_value_masking_roundtrip(self, tmp_path: Path, s1_store_path: Path) -> None: + """End-to-end: NaN nodata in vv comes back masked when the cube is reopened with + ``use_zarr_fill_value_as_mask=True`` — the behaviour the CF ``_FillValue`` attribute + exists to enable despite xarray #11345. Mirrors the S2 guarantee in + ``tests/test_array_attrs.py::test_fill_value_masking_roundtrip``. + """ + stamp = "20230115t061234" + rng = np.random.default_rng(0) + vv_data = rng.uniform(0.0, 1.0, (SIZE, SIZE)).astype(np.float32) + vv_data[0:16, 0:16] = np.nan # nodata patch + vh_data = rng.uniform(0.0, 0.5, (SIZE, SIZE)).astype(np.float32) + mask_data = np.ones((SIZE, SIZE), dtype=np.uint8) + vv = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC.tif" + vh = tmp_path / f"s1a_32TQM_vh_ASC_037_{stamp}_GammaNaughtRTC.tif" + mask = tmp_path / f"s1a_32TQM_vv_ASC_037_{stamp}_GammaNaughtRTC_BorderMask.tif" + _create_synthetic_geotiff(vv, vv_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(vh, vh_data, tags=ACQ1_TAGS) + _create_synthetic_geotiff(mask, mask_data, tags=ACQ1_TAGS) + ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") + + ds = xr.open_dataset( + str(s1_store_path / "ascending" / "r10m"), + engine="zarr", + consolidated=False, + decode_times=False, + decode_coords=False, + use_zarr_fill_value_as_mask=True, + ) + try: + masked = ds["vv"].to_masked_array() + assert np.ma.is_masked(masked), "NaN nodata must be masked via `_FillValue`" + assert masked.mask[0, 0, 0], "nodata cell must be masked" + assert not masked.mask[0, -1, -1], "valid cell must not be masked" + finally: + ds.close() + def test_preserves_data_integrity(self, s1_geotiff_dir: Path, s1_store_path: Path) -> None: vv, vh, mask = self._get_acq_paths(s1_geotiff_dir, "20230115t061234") ingest_s1tiling_acquisition(vv, vh, mask, s1_store_path, "ascending") @@ -679,6 +784,30 @@ def test_data_integrity_roundtrip( actual = root["ascending"]["conditions"]["gamma_area_037"][:] np.testing.assert_allclose(actual, expected, rtol=1e-6) + def test_float_conditions_declare_cf_fill_value( + self, + s1_store_with_acquisition: Path, + gamma_area_geotiff: Path, + lia_geotiff: Path, + ) -> None: + """Float condition arrays (gamma_area, lia) must declare a CF ``_FillValue`` so + readers mask NaN nodata (xarray #11345), like vv/vh (#172).""" + from xarray.backends.zarr import FillValueCoder + + ingest_s1tiling_conditions( + store_path=s1_store_with_acquisition, + orbit_direction="ascending", + relative_orbit=37, + gamma_area_path=gamma_area_geotiff, + lia_path=lia_geotiff, + ) + expected = FillValueCoder.encode(np.nan, np.dtype("float32")) + conditions = zarr.open_group(str(s1_store_with_acquisition), mode="r", zarr_format=3)[ + "ascending" + ]["conditions"] + for arr_name in ("gamma_area_037", "lia_037"): + assert dict(conditions[arr_name].attrs).get("_FillValue") == expected, arr_name + def test_gamma_area_is_sharded( self, s1_store_with_acquisition: Path, gamma_area_geotiff: Path ) -> None: