Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
pyo3 = { version = "0.27.1", features = ["abi3-py311"] }
zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] }
# `AsyncStoragePartialDecoder` is `pub` in `zarrs_codec` but not re-exported by
# `zarrs`; depend on it directly. Cargo unifies it to the same version `zarrs`
# uses, so the codec types are identical to those re-exported via `zarrs`.
zarrs_codec = { version = "0.2", features = ["async"] }
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
# fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path
Expand All @@ -23,6 +27,7 @@ opendal = { version = "0.55.0", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.10.0"
itertools = "0.14.0"
futures = "0.3.31"
bytemuck = { version = "1.24.0", features = ["must_cast"] }
pyo3-object_store = "0.7.0" # object_store 0.12
zarrs_object_store = "0.5.0" # object_store 0.12
Expand Down
25 changes: 25 additions & 0 deletions python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,31 @@ class ChunkItem:
shape: typing.Sequence[builtins.int],
) -> ChunkItem: ...

@typing.final
class AsyncCodecPipelineImpl:
def __new__(
cls,
array_metadata: builtins.str,
store_config: zarr.abc.store.Store,
*,
validate_checksums: builtins.bool = False,
chunk_concurrent_minimum: builtins.int | None = None,
chunk_concurrent_maximum: builtins.int | None = None,
num_threads: builtins.int | None = None,
direct_io: builtins.bool = False,
) -> AsyncCodecPipelineImpl: ...
def retrieve_chunks_and_apply_index(
self,
chunk_descriptions: typing.Sequence[ChunkItem],
value: numpy.typing.NDArray[typing.Any],
) -> None: ...
def store_chunks_with_indices(
self,
chunk_descriptions: typing.Sequence[ChunkItem],
value: numpy.typing.NDArray[typing.Any],
write_empty_chunks: builtins.bool,
) -> None: ...

@typing.final
class CodecPipelineImpl:
def __new__(
Expand Down
41 changes: 35 additions & 6 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata
from zarr.storage import FsspecStore

try:
from zarr.storage import ObjectStore

_ASYNC_STORES: tuple[type, ...] = (FsspecStore, ObjectStore)
except ImportError:
_ASYNC_STORES = (FsspecStore,)

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
Expand All @@ -24,7 +32,7 @@
from zarr.core.indexing import SelectorTuple
from zarr.dtype import ZDType

from ._internal import CodecPipelineImpl
from ._internal import AsyncCodecPipelineImpl, CodecPipelineImpl
from .utils import (
DiscontiguousArrayError,
FillValueNoneError,
Expand All @@ -41,16 +49,27 @@ class UnsupportedMetadataError(Exception):
pass


def _supports_async_pipeline(store: Store) -> bool:
# Async-backed stores issue many requests concurrently instead of blocking a
# worker thread per request; everything else (e.g. ``LocalStore``) uses the
# synchronous pipeline. Mirrors the stores accepted by the Rust
# ``AsyncReadableWritableListableStorage`` conversion (see ``src/store.rs``).
return isinstance(store, _ASYNC_STORES)


def get_codec_pipeline_impl(
metadata: ArrayMetadata, store: Store, *, strict: bool
) -> CodecPipelineImpl | None:
) -> CodecPipelineImpl | AsyncCodecPipelineImpl | None:
pipeline_class = (
AsyncCodecPipelineImpl if _supports_async_pipeline(store) else CodecPipelineImpl
)
try:
array_metadata_json = json.dumps(metadata.to_dict())
# Maintain old behavior: https://github.com/zarrs/zarrs-python/tree/b36ba797cafec77f5f41a25316be02c718a2b4f8?tab=readme-ov-file#configuration
validate_checksums = config.get("codec_pipeline.validate_checksums", True)
if validate_checksums is None:
validate_checksums = True
return CodecPipelineImpl(
return pipeline_class(
array_metadata_json,
store_config=store,
validate_checksums=validate_checksums,
Expand Down Expand Up @@ -101,7 +120,7 @@ def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]:
class ZarrsCodecPipeline(CodecPipeline):
metadata: ArrayMetadata
store: Store
impl: CodecPipelineImpl | None
impl: CodecPipelineImpl | AsyncCodecPipelineImpl | None
python_impl: BatchedCodecPipeline | None

def __getstate__(self) -> ZarrsCodecPipelineState:
Expand Down Expand Up @@ -164,6 +183,16 @@ async def encode(
) -> Iterable[Buffer | None]:
raise NotImplementedError("encode")

async def _run_impl(self, method, /, *args) -> None:
# The async pipeline drives all chunk I/O concurrently inside its own
# tokio runtime, so it is called directly. The synchronous pipeline
# blocks the calling thread, so it is offloaded to a worker thread to
# keep the event loop responsive.
if isinstance(self.impl, AsyncCodecPipelineImpl):
method(*args)
else:
await asyncio.to_thread(method, *args)

async def read(
self,
batch_info: Iterable[
Expand Down Expand Up @@ -195,7 +224,7 @@ async def read(
return None
else:
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
await self._run_impl(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc.chunk_info_with_indices,
out,
Expand Down Expand Up @@ -237,7 +266,7 @@ async def write(
)
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
await self._run_impl(
self.impl.store_chunks_with_indices,
chunks_desc.chunk_info_with_indices,
value_np,
Expand Down
Loading
Loading