Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changes/3826.feature.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Added a `subchunk_write_order` option to `ShardingCodec` to allow for `morton`, `unordered`, `lexicographic`, and `colexicographic` subchunk orderings.
Added a `subchunk_write_order` option to `ShardingCodec` to control the physical order of subchunks within a shard. Supported values are `morton`, `unordered`, `lexicographic`, and `colexicographic`. `unordered` makes no guarantee about subchunk layout. This setting affects only on-disk layout, not the data read back, and is not persisted in array metadata: it applies per codec instance and is not recovered when reopening a sharded array.
4 changes: 2 additions & 2 deletions docs/user-guide/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ filters (e.g., byte-shuffle) have been applied.

### Subchunk memory layout

The order of chunks **within each shard** can be changed via the `subchunk_write_order` parameter of the `ShardingCodec`. That parameter is a string which must be one of `["morton", "lexicographic", "colexicographic", "unordered"]`.
The order of chunks **within each shard** can be changed via the `subchunk_write_order` parameter of the `ShardingCodec`. That parameter is a string which must be one of `["morton", "unordered", "lexicographic", "colexicographic"]`.

By default [`morton`](https://en.wikipedia.org/wiki/Z-order_curve) order provides good spatial locality however [`lexicographic` (i.e., row-major)](https://en.wikipedia.org/wiki/Row-_and_column-major_order), for example, may be better suited to "batched" workflows where some form of sequential reading through a fixed number of outer dimensions is desired. The options are `lexicographic`, `morton`, `unordered` (i.e., random), and `colexicographic`.
By default [`morton`](https://en.wikipedia.org/wiki/Z-order_curve) order provides good spatial locality. [`lexicographic` (i.e., row-major)](https://en.wikipedia.org/wiki/Row-_and_column-major_order), for example, may be better suited to "batched" workflows where some form of sequential reading through a fixed number of outer dimensions is desired, and `colexicographic` is its reverse. `unordered` makes no guarantee about the order in which subchunks are laid out within a shard.


### Empty chunks
Expand Down
35 changes: 21 additions & 14 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,16 @@ def to_dict_vectorized(
class ShardingCodec(
ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin
):
"""Sharding codec"""
"""Sharding codec.

`subchunk_write_order` controls the physical order of subchunks within a shard. It is a
write-time setting only: it is not stored in array metadata, so reopening a sharded array
does not recover it (the setting reverts to the `morton` default per codec instance).
"""

chunk_shape: tuple[int, ...]
codecs: tuple[Codec, ...]
index_codecs: tuple[Codec, ...]
rng: np.random.Generator | None
index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end
subchunk_write_order: SubchunkWriteOrder = "morton"

Expand All @@ -311,7 +315,6 @@ def __init__(
index_codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(), Crc32cCodec()),
index_location: ShardingCodecIndexLocation | str = ShardingCodecIndexLocation.end,
subchunk_write_order: SubchunkWriteOrder = "morton",
rng: np.random.Generator | None = None,
) -> None:
chunk_shape_parsed = parse_shapelike(chunk_shape)
codecs_parsed = parse_codecs(codecs)
Expand All @@ -327,7 +330,6 @@ def __init__(
object.__setattr__(self, "index_codecs", index_codecs_parsed)
object.__setattr__(self, "index_location", index_location_parsed)
object.__setattr__(self, "subchunk_write_order", subchunk_write_order)
object.__setattr__(self, "rng", rng)

# Use instance-local lru_cache to avoid memory leaks

Expand All @@ -340,15 +342,17 @@ def __init__(

# todo: typedict return type
def __getstate__(self) -> dict[str, Any]:
return {"rng": self.rng, **self.to_dict()}
# `subchunk_write_order` is not part of codec metadata (`to_dict`), so carry it
# explicitly to survive a pickle round-trip (otherwise it reverts to `morton`).
return {"subchunk_write_order": self.subchunk_write_order, **self.to_dict()}

def __setstate__(self, state: dict[str, Any]) -> None:
config = state["configuration"]
object.__setattr__(self, "chunk_shape", parse_shapelike(config["chunk_shape"]))
object.__setattr__(self, "codecs", parse_codecs(config["codecs"]))
object.__setattr__(self, "index_codecs", parse_codecs(config["index_codecs"]))
object.__setattr__(self, "index_location", parse_index_location(config["index_location"]))
object.__setattr__(self, "rng", state["rng"])
object.__setattr__(self, "subchunk_write_order", state["subchunk_write_order"])

# Use instance-local lru_cache to avoid memory leaks
# object.__setattr__(self, "_get_chunk_spec", lru_cache()(self._get_chunk_spec))
Expand Down Expand Up @@ -537,11 +541,11 @@ def _subchunk_order_iter(
case "colexicographic":
subchunk_iter = (c[::-1] for c in np.ndindex(chunks_per_shard[::-1]))
case "unordered":
subchunk_list = list(np.ndindex(chunks_per_shard))
(self.rng if self.rng is not None else np.random.default_rng()).shuffle(
subchunk_list
)
subchunk_iter = iter(subchunk_list)
# "unordered" promises no particular layout; today it happens to be
# lexicographic, but callers must not rely on that.
subchunk_iter = np.ndindex(chunks_per_shard)
case _:
raise ValueError(f"Unrecognized subchunk write order: {subchunk_write_order!r}.")
return subchunk_iter

async def _encode_single(
Expand All @@ -561,7 +565,9 @@ async def _encode_single(
chunk_grid=ChunkGrid.from_sizes(shard_shape, chunk_shape),
)
)
shard_builder = dict.fromkeys(self._subchunk_order_iter(chunks_per_shard, "lexicographic"))
# The key order of this intermediate dict is immaterial; the physical layout is
# decided later by the `subchunk_write_order` loop in `_encode_shard_dict`.
shard_builder = dict.fromkeys(np.ndindex(chunks_per_shard))

await self.codec_pipeline.write(
[
Expand Down Expand Up @@ -604,7 +610,8 @@ async def _encode_partial_single(
)

if self._is_complete_shard_write(indexer, chunks_per_shard):
shard_dict = dict.fromkeys(self._subchunk_order_iter(chunks_per_shard, "lexicographic"))
# Intermediate key order is immaterial (see `_encode_single`).
shard_dict = dict.fromkeys(np.ndindex(chunks_per_shard))
else:
shard_reader = await self._load_full_shard_maybe(
byte_getter=byte_setter,
Expand All @@ -614,7 +621,7 @@ async def _encode_partial_single(
shard_reader = shard_reader or _ShardReader.create_empty(chunks_per_shard)
# Use vectorized lookup for better performance
shard_dict = shard_reader.to_dict_vectorized(
np.array(list(self._subchunk_order_iter(chunks_per_shard, "lexicographic")))
np.array(list(np.ndindex(chunks_per_shard)))
)

await self.codec_pipeline.write(
Expand Down
15 changes: 11 additions & 4 deletions src/zarr/testing/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,30 +308,38 @@ def arrays(
else:
chunks_param = draw(chunk_shapes(shape=nparray.shape), label="chunk shape")

if all(s > c and c > 1 for s, c in zip(nparray.shape, chunks_param, strict=True)):
if nparray.ndim > 0 and all(
s > c and c > 1 for s, c in zip(nparray.shape, chunks_param, strict=True)
):
shard_shape = draw(
st.none() | shard_shapes(shape=nparray.shape, chunk_shape=chunks_param),
label="shard shape",
)
if shard_shape is not None:
subchunk_write_order = draw(subchunk_write_orders)
# Drive sharding through the serializer alone: the array's (outer) chunk
# grid is the shard, and the ShardingCodec splits each shard into inner
# subchunks of `chunks_param`. Passing `shards=` as well would make
# create_array build a second, outer ShardingCodec and nest the two,
# leaving `subchunk_write_order` governing only a 1-element inner grid.
serializer = ShardingCodec(
subchunk_write_order=subchunk_write_order,
codecs=[BytesCodec(), ZstdCodec()],
index_codecs=[BytesCodec(), Crc32cCodec()],
chunk_shape=chunks_param,
)
compressors_unsearched = None
chunks_param = shard_shape
else:
chunks_param = draw(chunk_shapes(shape=nparray.shape), label="chunk shape")
a = root.create_array(
array_path,
shape=nparray.shape,
chunks=chunks_param,
shards=shard_shape,
shards=None,
dtype=nparray.dtype,
attributes=attributes,
compressors=compressors_unsearched, # FIXME
compressors=compressors_unsearched,
fill_value=fill_value,
dimension_names=dim_names,
serializer=serializer,
Expand All @@ -353,7 +361,6 @@ def arrays(
assert a.metadata.chunk_grid.chunk_shape == (
a.shards if shard_shape is not None else a.chunks
)
assert shard_shape == a.shards
else:
assert isinstance(a.metadata.chunk_grid, RectilinearChunkGridMetadata)
assert shard_shape is None
Expand Down
50 changes: 13 additions & 37 deletions tests/test_codecs/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,17 @@ async def test_delete_empty_shards(store: Store) -> None:


def test_pickle() -> None:
"""ShardingCodec round-trips through pickle, including the non-serialized
``subchunk_write_order`` (which ``to_dict`` omits and which must not silently
revert to the ``morton`` default)."""
codec = ShardingCodec(chunk_shape=(8, 8))
assert pickle.loads(pickle.dumps(codec)) == codec

ordered = ShardingCodec(chunk_shape=(8, 8), subchunk_write_order="lexicographic")
restored = pickle.loads(pickle.dumps(ordered))
assert restored == ordered
assert restored.subchunk_write_order == "lexicographic"


@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"])
@pytest.mark.parametrize(
Expand Down Expand Up @@ -867,54 +875,22 @@ async def stored_data_and_get_order(
async def test_encoded_subchunk_write_order(subchunk_write_order: SubchunkWriteOrder) -> None:
"""Subchunks must be physically laid out in the shard in the order specified by
``subchunk_write_order``. We verify this by decoding the shard index and sorting
the chunk coordinates by their byte offset."""
# Use a non-square chunks_per_shard so all three orderings are distinguishable.
the chunk coordinates by their byte offset. ``unordered`` makes no stable-order
promise, but is deterministic in this implementation, so it is checked the same way."""
# Use a non-square chunks_per_shard so all orderings are distinguishable.
chunks_per_shard = (3, 2)
chunk_shape = (4, 4)
seed = 0
codec = ShardingCodec(
chunk_shape=chunk_shape,
codecs=[BytesCodec()],
index_codecs=[BytesCodec(), Crc32cCodec()],
index_location=ShardingCodecIndexLocation.end,
subchunk_write_order=subchunk_write_order,
rng=np.random.default_rng(seed=seed),
)

actual_order = await stored_data_and_get_order(codec, chunks_per_shard)
if subchunk_write_order != "unordered":
expected_order = list(codec._subchunk_order_iter(chunks_per_shard, subchunk_write_order))
assert actual_order == expected_order
else:
same_order_same_seed = list(
ShardingCodec(
chunk_shape=chunk_shape,
codecs=[BytesCodec()],
index_codecs=[BytesCodec(), Crc32cCodec()],
index_location=ShardingCodecIndexLocation.end,
subchunk_write_order=subchunk_write_order,
rng=np.random.default_rng(seed=seed),
)._subchunk_order_iter(chunks_per_shard, subchunk_write_order)
)
assert actual_order == same_order_same_seed


async def test_unordered_can_be_seeded() -> None:
orders = []
chunks_per_shard = (3, 2)
chunk_shape = (4, 4)
seed = 0
for _ in range(4):
codec = ShardingCodec(
chunk_shape=chunk_shape,
codecs=[BytesCodec()],
index_codecs=[BytesCodec(), Crc32cCodec()],
index_location=ShardingCodecIndexLocation.end,
subchunk_write_order="unordered",
rng=np.random.default_rng(seed=seed),
)
orders.append(await stored_data_and_get_order(codec, chunks_per_shard))
assert all(orders[0] == o for o in orders)
expected_order = list(codec._subchunk_order_iter(chunks_per_shard, subchunk_write_order))
assert actual_order == expected_order


@pytest.mark.parametrize(
Expand Down
Loading