diff --git a/mdio/dataset_factory.h b/mdio/dataset_factory.h index cfa99eb..745cffb 100644 --- a/mdio/dataset_factory.h +++ b/mdio/dataset_factory.h @@ -323,6 +323,71 @@ inline absl::Status apply_v3_codecs(nlohmann::json& input /*NOLINT*/, return absl::OkStatus(); } +/** + * @brief Wraps a V3 codec pipeline in a sharding_indexed codec when the spec + * requests sharding via metadata.chunkGrid.configuration.shardShape. + * + * Decouples the storage object size (the shard = the array-level chunk) from + * the read/decompress granularity (the inner chunk). The inner chunk is + * whatever transform_chunks already wrote (from chunkShape); the array chunk + * grid is rewritten to the shard shape, and the existing inner codecs are + * nested inside the sharding codec. A no-op when shardShape is absent. + * @param input A MDIO Variable spec + * @param variable A Variable stub (will be modified) + * @return OkStatus, or InvalidArgumentError if shardShape is malformed or is + * not a positive integer multiple of chunkShape on every axis. + */ +inline absl::Status apply_v3_sharding(const nlohmann::json& input, + nlohmann::json& variable /*NOLINT*/) { + if (!input.contains("metadata") || + !input["metadata"].contains("chunkGrid")) { + return absl::OkStatus(); + } + const auto& grid = input["metadata"]["chunkGrid"]; + if (!grid.contains("configuration") || + !grid["configuration"].contains("shardShape")) { + return absl::OkStatus(); + } + + const nlohmann::json shard_shape = grid["configuration"]["shardShape"]; + const nlohmann::json inner_chunk = + variable["metadata"]["chunk_grid"]["configuration"]["chunk_shape"]; + + if (!shard_shape.is_array() || !inner_chunk.is_array() || + shard_shape.size() != inner_chunk.size()) { + return absl::InvalidArgumentError( + "shardShape must be an array matching the rank of chunkShape"); + } + for (size_t i = 0; i < shard_shape.size(); ++i) { + const auto s = shard_shape[i].get(); + const auto c = inner_chunk[i].get(); + if (c <= 0 || s <= 0 || s % c != 0) { + return absl::InvalidArgumentError( + "shardShape must be a positive integer multiple of chunkShape on " + "every axis"); + } + } + + // Nest the existing inner codec pipeline inside a sharding_indexed codec. + const nlohmann::json inner_codecs = variable["metadata"]["codecs"]; + nlohmann::json sharding = { + {"name", "sharding_indexed"}, + {"configuration", + {{"chunk_shape", inner_chunk}, + {"codecs", inner_codecs}, + {"index_codecs", + nlohmann::json::array( + {{{"name", "bytes"}, {"configuration", {{"endian", "little"}}}}, + {{"name", "crc32c"}}})}, + {"index_location", "end"}}}}; + + variable["metadata"]["codecs"] = nlohmann::json::array({sharding}); + // The array-level chunk (the storage object) is now the shard. + variable["metadata"]["chunk_grid"]["configuration"]["chunk_shape"] = + shard_shape; + return absl::OkStatus(); +} + /** * @brief Applies the Blosc compressor to a V2 compressor object. * @@ -650,6 +715,9 @@ inline tensorstore::Result from_json_to_spec( MDIO_RETURN_IF_ERROR(transform_compressor(json, variableStub, version)); transform_shape(json, variableStub, dimensionMap); transform_chunks(json, variableStub, layout); + if (layout.uses_codec_pipeline) { + MDIO_RETURN_IF_ERROR(apply_v3_sharding(json, variableStub)); + } // Fill values depend only on the data type, so they must be set for every // Variable (including dimension coordinates that omit a "metadata" block) to diff --git a/mdio/dataset_schema.h b/mdio/dataset_schema.h index e57f456..418836a 100644 --- a/mdio/dataset_schema.h +++ b/mdio/dataset_schema.h @@ -610,6 +610,14 @@ would prematurely terminate a bare R"( )" literal. Keep the JSON delimiter. }, "title": "Chunkshape", "type": "array" + }, + "shardShape": { + "description": "Optional Zarr v3 shard (storage object) shape. When present, chunkShape becomes the inner read chunk and shardShape the storage object, packed via the sharding_indexed codec. Must be an integer multiple of chunkShape on every axis.", + "items": { + "type": "integer" + }, + "title": "Shardshape", + "type": "array" } }, "required": [