Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions mdio/dataset_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,71 @@ inline absl::Status apply_v3_codecs(nlohmann::json& input /*NOLINT*/,
return absl::OkStatus();
}

/**
* @brief Wraps a V3 codec pipeline in a sharding_indexed codec when the spec
* requests sharding via metadata.chunkGrid.configuration.shardShape.
*
* Decouples the storage object size (the shard = the array-level chunk) from
* the read/decompress granularity (the inner chunk). The inner chunk is
* whatever transform_chunks already wrote (from chunkShape); the array chunk
* grid is rewritten to the shard shape, and the existing inner codecs are
* nested inside the sharding codec. A no-op when shardShape is absent.
* @param input A MDIO Variable spec
* @param variable A Variable stub (will be modified)
* @return OkStatus, or InvalidArgumentError if shardShape is malformed or is
* not a positive integer multiple of chunkShape on every axis.
*/
inline absl::Status apply_v3_sharding(const nlohmann::json& input,
nlohmann::json& variable /*NOLINT*/) {
if (!input.contains("metadata") ||
!input["metadata"].contains("chunkGrid")) {
return absl::OkStatus();
}
const auto& grid = input["metadata"]["chunkGrid"];
if (!grid.contains("configuration") ||
!grid["configuration"].contains("shardShape")) {
return absl::OkStatus();
}

const nlohmann::json shard_shape = grid["configuration"]["shardShape"];
const nlohmann::json inner_chunk =
variable["metadata"]["chunk_grid"]["configuration"]["chunk_shape"];

if (!shard_shape.is_array() || !inner_chunk.is_array() ||
shard_shape.size() != inner_chunk.size()) {
return absl::InvalidArgumentError(
"shardShape must be an array matching the rank of chunkShape");
}
for (size_t i = 0; i < shard_shape.size(); ++i) {
const auto s = shard_shape[i].get<int64_t>();
const auto c = inner_chunk[i].get<int64_t>();
if (c <= 0 || s <= 0 || s % c != 0) {
return absl::InvalidArgumentError(
"shardShape must be a positive integer multiple of chunkShape on "
"every axis");
}
}

// Nest the existing inner codec pipeline inside a sharding_indexed codec.
const nlohmann::json inner_codecs = variable["metadata"]["codecs"];
nlohmann::json sharding = {
{"name", "sharding_indexed"},
{"configuration",
{{"chunk_shape", inner_chunk},
{"codecs", inner_codecs},
{"index_codecs",
nlohmann::json::array(
{{{"name", "bytes"}, {"configuration", {{"endian", "little"}}}},
{{"name", "crc32c"}}})},
{"index_location", "end"}}}};

variable["metadata"]["codecs"] = nlohmann::json::array({sharding});
// The array-level chunk (the storage object) is now the shard.
variable["metadata"]["chunk_grid"]["configuration"]["chunk_shape"] =
shard_shape;
return absl::OkStatus();
}

/**
* @brief Applies the Blosc compressor to a V2 compressor object.
*
Expand Down Expand Up @@ -650,6 +715,9 @@ inline tensorstore::Result<nlohmann::json> from_json_to_spec(
MDIO_RETURN_IF_ERROR(transform_compressor(json, variableStub, version));
transform_shape(json, variableStub, dimensionMap);
transform_chunks(json, variableStub, layout);
if (layout.uses_codec_pipeline) {
MDIO_RETURN_IF_ERROR(apply_v3_sharding(json, variableStub));
}

// Fill values depend only on the data type, so they must be set for every
// Variable (including dimension coordinates that omit a "metadata" block) to
Expand Down
8 changes: 8 additions & 0 deletions mdio/dataset_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,14 @@ would prematurely terminate a bare R"( )" literal. Keep the JSON delimiter.
},
"title": "Chunkshape",
"type": "array"
},
"shardShape": {
"description": "Optional Zarr v3 shard (storage object) shape. When present, chunkShape becomes the inner read chunk and shardShape the storage object, packed via the sharding_indexed codec. Must be an integer multiple of chunkShape on every axis.",
"items": {
"type": "integer"
},
"title": "Shardshape",
"type": "array"
}
},
"required": [
Expand Down
Loading