From 2ece304be897272a5003c00abb6b09acc36a3148 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Tue, 30 Jun 2026 13:02:40 +0200 Subject: [PATCH] feat: async pipeline --- Cargo.toml | 5 + python/zarrs/_internal.pyi | 25 ++ python/zarrs/pipeline.py | 41 ++- src/async_pipeline.rs | 507 +++++++++++++++++++++++++++++++++++++ src/concurrency.rs | 44 ++-- src/lib.rs | 488 +++++------------------------------ src/runtime.rs | 23 +- src/store.rs | 35 ++- src/store/http.rs | 13 +- src/store/obstore.rs | 14 +- src/sync_pipeline.rs | 461 +++++++++++++++++++++++++++++++++ src/utils.rs | 2 +- 12 files changed, 1198 insertions(+), 460 deletions(-) create mode 100644 src/async_pipeline.rs create mode 100644 src/sync_pipeline.rs diff --git a/Cargo.toml b/Cargo.toml index e3454bec..42bfcae7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,10 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] } +# `AsyncStoragePartialDecoder` is `pub` in `zarrs_codec` but not re-exported by +# `zarrs`; depend on it directly. Cargo unifies it to the same version `zarrs` +# uses, so the codec types are identical to those re-exported via `zarrs`. +zarrs_codec = { version = "0.2", features = ["async"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path @@ -23,6 +27,7 @@ opendal = { version = "0.55.0", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } zarrs_opendal = "0.10.0" itertools = "0.14.0" +futures = "0.3.31" bytemuck = { version = "1.24.0", features = ["must_cast"] } pyo3-object_store = "0.7.0" # object_store 0.12 zarrs_object_store = "0.5.0" # object_store 0.12 diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 4af635a7..38f55edd 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -18,6 +18,31 @@ class ChunkItem: shape: typing.Sequence[builtins.int], ) -> ChunkItem: ... +@typing.final +class AsyncCodecPipelineImpl: + def __new__( + cls, + array_metadata: builtins.str, + store_config: zarr.abc.store.Store, + *, + validate_checksums: builtins.bool = False, + chunk_concurrent_minimum: builtins.int | None = None, + chunk_concurrent_maximum: builtins.int | None = None, + num_threads: builtins.int | None = None, + direct_io: builtins.bool = False, + ) -> AsyncCodecPipelineImpl: ... + def retrieve_chunks_and_apply_index( + self, + chunk_descriptions: typing.Sequence[ChunkItem], + value: numpy.typing.NDArray[typing.Any], + ) -> None: ... + def store_chunks_with_indices( + self, + chunk_descriptions: typing.Sequence[ChunkItem], + value: numpy.typing.NDArray[typing.Any], + write_empty_chunks: builtins.bool, + ) -> None: ... + @typing.final class CodecPipelineImpl: def __new__( diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index d033820a..b328fbc4 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -12,6 +12,14 @@ from zarr.core import BatchedCodecPipeline from zarr.core.config import config from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata +from zarr.storage import FsspecStore + +try: + from zarr.storage import ObjectStore + + _ASYNC_STORES: tuple[type, ...] = (FsspecStore, ObjectStore) +except ImportError: + _ASYNC_STORES = (FsspecStore,) if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -24,7 +32,7 @@ from zarr.core.indexing import SelectorTuple from zarr.dtype import ZDType -from ._internal import CodecPipelineImpl +from ._internal import AsyncCodecPipelineImpl, CodecPipelineImpl from .utils import ( DiscontiguousArrayError, FillValueNoneError, @@ -41,16 +49,27 @@ class UnsupportedMetadataError(Exception): pass +def _supports_async_pipeline(store: Store) -> bool: + # Async-backed stores issue many requests concurrently instead of blocking a + # worker thread per request; everything else (e.g. ``LocalStore``) uses the + # synchronous pipeline. Mirrors the stores accepted by the Rust + # ``AsyncReadableWritableListableStorage`` conversion (see ``src/store.rs``). + return isinstance(store, _ASYNC_STORES) + + def get_codec_pipeline_impl( metadata: ArrayMetadata, store: Store, *, strict: bool -) -> CodecPipelineImpl | None: +) -> CodecPipelineImpl | AsyncCodecPipelineImpl | None: + pipeline_class = ( + AsyncCodecPipelineImpl if _supports_async_pipeline(store) else CodecPipelineImpl + ) try: array_metadata_json = json.dumps(metadata.to_dict()) # Maintain old behavior: https://github.com/zarrs/zarrs-python/tree/b36ba797cafec77f5f41a25316be02c718a2b4f8?tab=readme-ov-file#configuration validate_checksums = config.get("codec_pipeline.validate_checksums", True) if validate_checksums is None: validate_checksums = True - return CodecPipelineImpl( + return pipeline_class( array_metadata_json, store_config=store, validate_checksums=validate_checksums, @@ -101,7 +120,7 @@ def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]: class ZarrsCodecPipeline(CodecPipeline): metadata: ArrayMetadata store: Store - impl: CodecPipelineImpl | None + impl: CodecPipelineImpl | AsyncCodecPipelineImpl | None python_impl: BatchedCodecPipeline | None def __getstate__(self) -> ZarrsCodecPipelineState: @@ -164,6 +183,16 @@ async def encode( ) -> Iterable[Buffer | None]: raise NotImplementedError("encode") + async def _run_impl(self, method, /, *args) -> None: + # The async pipeline drives all chunk I/O concurrently inside its own + # tokio runtime, so it is called directly. The synchronous pipeline + # blocks the calling thread, so it is offloaded to a worker thread to + # keep the event loop responsive. + if isinstance(self.impl, AsyncCodecPipelineImpl): + method(*args) + else: + await asyncio.to_thread(method, *args) + async def read( self, batch_info: Iterable[ @@ -195,7 +224,7 @@ async def read( return None else: out: NDArrayLike = out.as_ndarray_like() - await asyncio.to_thread( + await self._run_impl( self.impl.retrieve_chunks_and_apply_index, chunks_desc.chunk_info_with_indices, out, @@ -237,7 +266,7 @@ async def write( ) elif not value_np.flags.c_contiguous: value_np = np.ascontiguousarray(value_np) - await asyncio.to_thread( + await self._run_impl( self.impl.store_chunks_with_indices, chunks_desc.chunk_info_with_indices, value_np, diff --git a/src/async_pipeline.rs b/src/async_pipeline.rs new file mode 100644 index 00000000..60836cab --- /dev/null +++ b/src/async_pipeline.rs @@ -0,0 +1,507 @@ +//! Asynchronous codec pipeline. +//! +//! [`AsyncCodecPipelineImpl`] mirrors [`crate::CodecPipelineImpl`], but holds an +//! [`AsyncReadableWritableListableStorage`] and talks to it with `async` +//! `get`/`set`/`erase` calls instead of going through the +//! `AsyncToSyncStorageAdapter`. +//! +//! [`tokio::task::spawn`] / [`JoinSet`](tokio::task::JoinSet) require their +//! futures to be `Send + 'static`, so a task that decoded *directly into* that +//! buffer would not compile: the view borrows `value` and cannot satisfy +//! `'static` (and we would also have to argue disjoint-write soundness across +//! threads we do not control). +//! +//! So retrieval is split into two phases: +//! +//! 1. **Fetch (parallel concurrent-fetch/decode, `tokio`) +//! 2. **Fill (synchronous, parallel).** +//! [`ArrayBytesFixedDisjointView::copy_from_slice`] over its disjoint subset. +//! This touches the non-`'static` borrow, but it is plain synchronous code — +//! no `tokio`, no `'static` requirement. +//! +//! The trade-off is that all decoded chunk subsets are held in memory at once +//! (≈ the size of `value`) before the fill begins. +//! +//! The write path has no such conflict — it only *reads* the input buffer — so it +//! still drives `async` stores concurrently inside a single +//! [`crate::runtime::block_on`] via [`futures`] combinators. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; + +use futures::stream::{self, TryStreamExt}; +use itertools::Itertools; +use numpy::{PyUntypedArray, PyUntypedArrayMethods}; +use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; +use pyo3::prelude::*; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +use tokio::task::JoinSet; +use zarrs::array::{ + ArrayBytes, ArrayBytesFixedDisjointView, ArrayToBytesCodecTraits, + AsyncArrayPartialDecoderTraits, CodecChain, CodecOptions, DataType, FillValue, + update_array_bytes, +}; +use zarrs::storage::{ + AsyncReadableStorageTraits, AsyncReadableWritableListableStorage, AsyncWritableStorageTraits, + StorageHandle, StoreKey, +}; +// Not re-exported by `zarrs` (only the async traits are); see Cargo.toml. +use zarrs_codec::AsyncStoragePartialDecoder; + +use crate::chunk_item::ChunkItem; +use crate::concurrency::{ChunkConcurrentLimitAndCodecOptions, CodecPipelineConfig}; +use crate::runtime::block_on; +use crate::store::StoreConfig; +use crate::utils::{PyCodecErrExt, PyErrExt as _, is_whole_chunk}; +use crate::{ArrayConfig, CodecPipelineImpl, parse_array_config}; + +struct RetrieveContext { + store: AsyncReadableWritableListableStorage, + codec_chain: Arc, + codec_options: CodecOptions, + data_type: DataType, + fill_value: FillValue, +} + +#[gen_stub_pyclass] +#[pyclass] +pub struct AsyncCodecPipelineImpl { + pub(crate) store: AsyncReadableWritableListableStorage, + pub(crate) codec_chain: Arc, + pub(crate) codec_options: CodecOptions, + pub(crate) chunk_concurrent_minimum: usize, + pub(crate) chunk_concurrent_maximum: usize, + pub(crate) num_threads: usize, + pub(crate) fill_value: FillValue, + pub(crate) data_type: DataType, +} + +impl CodecPipelineConfig for AsyncCodecPipelineImpl { + fn codec_chain(&self) -> &CodecChain { + &self.codec_chain + } + fn data_type(&self) -> &DataType { + &self.data_type + } + fn codec_options(&self) -> &CodecOptions { + &self.codec_options + } + fn chunk_concurrent_minimum(&self) -> usize { + self.chunk_concurrent_minimum + } + fn chunk_concurrent_maximum(&self) -> usize { + self.chunk_concurrent_maximum + } + fn num_threads(&self) -> usize { + self.num_threads + } +} + +impl AsyncCodecPipelineImpl { + async fn retrieve_chunk_bytes<'a>( + &self, + item: &ChunkItem, + codec_options: &CodecOptions, + ) -> PyResult> { + let value_encoded = self + .store + .get(&item.key) + .await + .map_py_err::()?; + let value_decoded = if let Some(value_encoded) = value_encoded { + let value_encoded: Vec = value_encoded.into(); // zero-copy in this case + self.codec_chain + .decode( + value_encoded.into(), + &item.shape, + &self.data_type, + &self.fill_value, + codec_options, + ) + .map_codec_err()? + } else { + ArrayBytes::new_fill_value(&self.data_type, item.num_elements, &self.fill_value) + .map_py_err::()? + }; + Ok(value_decoded) + } + + async fn store_chunk_bytes( + &self, + item: &ChunkItem, + value_decoded: ArrayBytes<'_>, + codec_options: &CodecOptions, + ) -> PyResult<()> { + value_decoded + .validate(item.num_elements, &self.data_type) + .map_codec_err()?; + + if value_decoded.is_fill_value(&self.fill_value) { + self.store + .erase(&item.key) + .await + .map_py_err::() + } else { + let value_encoded = self + .codec_chain + .encode( + value_decoded, + &item.shape, + &self.data_type, + &self.fill_value, + codec_options, + ) + .map(Cow::into_owned) + .map_codec_err()?; + + // Store the encoded chunk + self.store + .set(&item.key, value_encoded.into()) + .await + .map_py_err::() + } + } + + async fn store_chunk_subset_bytes( + &self, + item: &ChunkItem, + chunk_subset_bytes: ArrayBytes<'_>, + codec_options: &CodecOptions, + ) -> PyResult<()> { + let array_shape = &item.shape; + let chunk_subset = &item.chunk_subset; + if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { + return Err(PyErr::new::(format!( + "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + ))); + } + let data_type_size = self.data_type.size(); + + if is_whole_chunk(item) { + // Fast path if the chunk subset spans the entire chunk, no read required + self.store_chunk_bytes(item, chunk_subset_bytes, codec_options) + .await + } else { + // Validate the chunk subset bytes + chunk_subset_bytes + .validate(chunk_subset.num_elements(), &self.data_type) + .map_codec_err()?; + + // Retrieve the chunk + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_options).await?; + + // Update the chunk + let chunk_bytes_new = update_array_bytes( + chunk_bytes_old, + bytemuck::must_cast_slice(array_shape), + chunk_subset, + &chunk_subset_bytes, + data_type_size, + ) + .map_codec_err()?; + + // Store the updated chunk + self.store_chunk_bytes(item, chunk_bytes_new, codec_options) + .await + } + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl AsyncCodecPipelineImpl { + #[pyo3(signature = ( + array_metadata, + store_config, + *, + validate_checksums=false, + chunk_concurrent_minimum=None, + chunk_concurrent_maximum=None, + num_threads=None, + direct_io=false, + ))] + #[new] + fn new( + array_metadata: &str, + mut store_config: StoreConfig, + validate_checksums: bool, + chunk_concurrent_minimum: Option, + chunk_concurrent_maximum: Option, + num_threads: Option, + direct_io: bool, + ) -> PyResult { + // `direct_io` only affects the (synchronous) filesystem store, which the + // async pipeline does not support; kept for signature parity. + store_config.direct_io(direct_io); + let ArrayConfig { + codec_chain, + codec_options, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + fill_value, + data_type, + } = parse_array_config( + array_metadata, + validate_checksums, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + )?; + + let store: AsyncReadableWritableListableStorage = + (&store_config).try_into().map_py_err::()?; + + Ok(Self { + store, + codec_chain, + codec_options, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + fill_value, + data_type, + }) + } + + fn retrieve_chunks_and_apply_index( + &self, + py: Python, + chunk_descriptions: Vec, // FIXME: Ref / iterable? + value: &Bound<'_, PyUntypedArray>, + ) -> PyResult<()> { + // `output` is an `UnsafeCellSlice` borrowing `value`'s buffer; its + // lifetime is *not* `'static`. It is never handed to a `tokio` task — + // only the synchronous fill phase below touches it (see module docs). + let output = CodecPipelineImpl::nparray_to_unsafe_cell_slice(value)?; + + // Only the codec options are needed here; chunk-level parallelism is + // delegated to the tokio runtime rather than a manual concurrency limit. + let Some((_chunk_concurrent_limit, codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + else { + return Ok(()); + }; + + // FIXME: the fill phase only supports fixed length data types. For + // variable length data types, need a codepath without `copy_from_slice`. + let data_type_size = self + .data_type + .fixed_size() + .ok_or("variable length data type not supported") + .map_py_err::()?; + + // Unique partial (non-whole) chunks each need an async partial decoder, + // so that subsets sharing a shard reuse a single decoder. + let partial_chunk_items: Vec = chunk_descriptions + .iter() + .filter(|item| !is_whole_chunk(item)) + .unique_by(|item| item.key.clone()) + .cloned() + .collect(); + + // Owned, `'static` config cloned into each fetch task. + let ctx = Arc::new(RetrieveContext { + store: self.store.clone(), + codec_chain: self.codec_chain.clone(), + codec_options, + data_type: self.data_type.clone(), + fill_value: self.fill_value.clone(), + }); + + py.detach(move || { + // Phase 1: fetch + decode every chunk's subset into an owned + // `ArrayBytes`, in parallel across the tokio runtime's worker threads. + let decoded: Vec> = block_on(async { + // Build the partial decoders (one per unique shard) in parallel. + let mut cache_tasks: JoinSet< + PyResult<(StoreKey, Arc)>, + > = JoinSet::new(); + for item in partial_chunk_items { + let ctx = ctx.clone(); + cache_tasks.spawn(async move { + let storage_handle = Arc::new(StorageHandle::new(ctx.store.clone())); + let input_handle = Arc::new(AsyncStoragePartialDecoder::new( + storage_handle, + item.key.clone(), + )); + let partial_decoder = ctx + .codec_chain + .clone() + .async_partial_decoder( + input_handle, + &item.shape, + &ctx.data_type, + &ctx.fill_value, + &ctx.codec_options, + ) + .await + .map_codec_err()?; + Ok((item.key, partial_decoder)) + }); + } + let mut partial_decoder_cache: HashMap< + StoreKey, + Arc, + > = HashMap::new(); + while let Some(joined) = cache_tasks.join_next().await { + let (key, decoder) = joined.map_py_err::()??; + partial_decoder_cache.insert(key, decoder); + } + + // One fetch task per chunk. Each returns owned bytes, so the task + // is `Send + 'static` and runs with real tokio parallelism. + let mut fetch_tasks: JoinSet)>> = + JoinSet::new(); + for (index, item) in chunk_descriptions.iter().enumerate() { + let ctx = ctx.clone(); + let item = item.clone(); + let partial_decoder = partial_decoder_cache.get(&item.key).cloned(); + fetch_tasks.spawn(async move { + let bytes: ArrayBytes<'static> = if is_whole_chunk(&item) { + // See zarrs::array::Array::async_retrieve_chunk_opt + match ctx + .store + .get(&item.key) + .await + .map_py_err::()? + { + Some(chunk_encoded) => { + let chunk_encoded: Vec = chunk_encoded.into(); + ctx.codec_chain + .decode( + chunk_encoded.into(), + &item.shape, + &ctx.data_type, + &ctx.fill_value, + &ctx.codec_options, + ) + .map_codec_err()? + .into_owned() + } + // Missing chunk: the subset is the fill value. + None => ArrayBytes::new_fill_value( + &ctx.data_type, + item.num_elements, + &ctx.fill_value, + ) + .map_py_err::()? + .into_owned(), + } + } else { + // See zarrs::array::Array::async_retrieve_chunk_subset_opt + let partial_decoder = partial_decoder.ok_or_else(|| { + PyRuntimeError::new_err(format!( + "Partial decoder not found for key: {}", + item.key + )) + })?; + // `into_owned` here drops the borrow of the task-local + // partial decoder so the bytes can leave the task. + partial_decoder + .partial_decode(&item.chunk_subset, &ctx.codec_options) + .await + .map_codec_err()? + .into_owned() + }; + Ok((index, bytes)) + }); + } + + let mut decoded: Vec>> = + (0..chunk_descriptions.len()).map(|_| None).collect(); + while let Some(joined) = fetch_tasks.join_next().await { + let (index, bytes) = joined.map_py_err::()??; + decoded[index] = Some(bytes); + } + // Every index is produced exactly once by the loop above. + Ok::<_, PyErr>( + decoded + .into_iter() + .map(|bytes| bytes.expect("every chunk index is decoded exactly once")) + .collect(), + ) + })?; + + // Phase 2: copy each decoded subset into `value`. Synchronous, so the + // non-`'static` borrow of the output buffer is sound. + for (item, bytes) in chunk_descriptions.iter().zip(decoded) { + let mut output_view = unsafe { + // SAFETY: chunks represent disjoint array subsets, so the + // views never write overlapping output bytes. + ArrayBytesFixedDisjointView::new( + output, + data_type_size, + bytemuck::must_cast_slice(&item.array_shape), + item.subset.clone(), + ) + .map_py_err::()? + }; + let bytes = bytes.into_fixed().map_py_err::()?; + output_view + .copy_from_slice(&bytes) + .map_py_err::()?; + } + Ok(()) + }) + } + + fn store_chunks_with_indices( + &self, + py: Python, + chunk_descriptions: Vec, + value: &Bound<'_, PyUntypedArray>, + write_empty_chunks: bool, + ) -> PyResult<()> { + enum InputValue<'a> { + Array(ArrayBytes<'a>), + Constant(FillValue), + } + + // Get input array + let input_slice = CodecPipelineImpl::nparray_to_slice(value)?; + let input = if value.ndim() > 0 { + // FIXME: Handle variable length data types, convert value to bytes and offsets + InputValue::Array(ArrayBytes::new_flen(Cow::Borrowed(input_slice))) + } else { + InputValue::Constant(FillValue::new(input_slice.to_vec())) + }; + + // Adjust the concurrency based on the codec chain and the first chunk description + let Some((chunk_concurrent_limit, mut codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + else { + return Ok(()); + }; + codec_options.set_store_empty_chunks(write_empty_chunks); + + py.detach(move || { + block_on(async move { + let input = &input; + let codec_options = &codec_options; + stream::iter(chunk_descriptions.into_iter().map(Ok::)) + .try_for_each_concurrent(chunk_concurrent_limit, move |item| async move { + let chunk_subset_bytes = match input { + InputValue::Array(input) => input + .extract_array_subset( + &item.subset, + bytemuck::must_cast_slice(&item.array_shape), + &self.data_type, + ) + .map_codec_err()?, + InputValue::Constant(constant_value) => ArrayBytes::new_fill_value( + &self.data_type, + item.chunk_subset.num_elements(), + constant_value, + ) + .map_py_err::()?, + }; + self.store_chunk_subset_bytes(&item, chunk_subset_bytes, codec_options) + .await + }) + .await + }) + }) + } +} diff --git a/src/concurrency.rs b/src/concurrency.rs index 08b9dca1..ec4c29d7 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -1,44 +1,56 @@ use pyo3::PyResult; use zarrs::array::{ - ArrayCodecTraits, CodecOptions, RecommendedConcurrency, + ArrayCodecTraits, CodecChain, CodecOptions, DataType, RecommendedConcurrency, concurrency::calc_concurrency_outer_inner, }; -use crate::{CodecPipelineImpl, chunk_item::ChunkItem, utils::PyCodecErrExt as _}; +use crate::{chunk_item::ChunkItem, utils::PyCodecErrExt as _}; + +/// The pieces of pipeline configuration needed to compute concurrency limits. +/// +/// Implemented by both the synchronous [`crate::CodecPipelineImpl`] and the +/// asynchronous [`crate::async_pipeline::AsyncCodecPipelineImpl`], so the +/// concurrency calculation can be shared between them. +pub trait CodecPipelineConfig { + fn codec_chain(&self) -> &CodecChain; + fn data_type(&self) -> &DataType; + fn codec_options(&self) -> &CodecOptions; + fn chunk_concurrent_minimum(&self) -> usize; + fn chunk_concurrent_maximum(&self) -> usize; + fn num_threads(&self) -> usize; +} pub trait ChunkConcurrentLimitAndCodecOptions { - fn get_chunk_concurrent_limit_and_codec_options( + fn get_chunk_concurrent_limit_and_codec_options( &self, - codec_pipeline_impl: &CodecPipelineImpl, + config: &C, ) -> PyResult>; } impl ChunkConcurrentLimitAndCodecOptions for Vec { - fn get_chunk_concurrent_limit_and_codec_options( + fn get_chunk_concurrent_limit_and_codec_options( &self, - codec_pipeline_impl: &CodecPipelineImpl, + config: &C, ) -> PyResult> { let num_chunks = self.len(); let Some(item) = self.first() else { return Ok(None); }; - let codec_concurrency = codec_pipeline_impl - .codec_chain - .recommended_concurrency(&item.shape, &codec_pipeline_impl.data_type) + let codec_concurrency = config + .codec_chain() + .recommended_concurrency(&item.shape, config.data_type()) .map_codec_err()?; - let min_concurrent_chunks = - std::cmp::min(codec_pipeline_impl.chunk_concurrent_minimum, num_chunks); - let max_concurrent_chunks = - std::cmp::max(codec_pipeline_impl.chunk_concurrent_maximum, num_chunks); + let min_concurrent_chunks = std::cmp::min(config.chunk_concurrent_minimum(), num_chunks); + let max_concurrent_chunks = std::cmp::max(config.chunk_concurrent_maximum(), num_chunks); let (chunk_concurrent_limit, codec_concurrent_limit) = calc_concurrency_outer_inner( - codec_pipeline_impl.num_threads, + config.num_threads(), &RecommendedConcurrency::new(min_concurrent_chunks..max_concurrent_chunks), &codec_concurrency, ); - let codec_options = codec_pipeline_impl - .codec_options + let codec_options = config + .codec_options() .with_concurrent_target(codec_concurrent_limit); Ok(Some((chunk_concurrent_limit, codec_options))) } diff --git a/src/lib.rs b/src/lib.rs index 7cc1a0f7..a4978ebe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,49 +2,33 @@ #![allow(clippy::module_name_repetitions)] use std::borrow::Cow; -use std::collections::HashMap; -use std::ptr::NonNull; use std::sync::Arc; -use chunk_item::ChunkItem; -use itertools::Itertools; -use numpy::npyffi::PyArrayObject; -use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; +use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3_stub_gen::define_stub_info_gatherer; -use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use rayon_iter_concurrent_limit::iter_concurrent_limit; -use unsafe_cell_slice::UnsafeCellSlice; -use utils::is_whole_chunk; -use zarrs::array::{ - ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata, - ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, - FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, -}; +use zarrs::array::{ArrayMetadata, CodecChain, CodecOptions, DataType, FillValue}; use zarrs::config::global_config; use zarrs::convert::array_metadata_v2_to_v3; use zarrs::plugin::ZarrVersion; -use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; +mod async_pipeline; mod chunk_item; mod concurrency; mod runtime; mod store; +mod sync_pipeline; #[cfg(test)] mod tests; mod utils; -use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; -use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _}; +use crate::async_pipeline::AsyncCodecPipelineImpl; +use crate::sync_pipeline::CodecPipelineImpl; +use crate::utils::PyErrExt as _; -// TODO: Use a OnceLock for store with get_or_try_init when stabilised? -#[gen_stub_pyclass] -#[pyclass] -pub struct CodecPipelineImpl { - pub(crate) store: ReadableWritableListableStorage, +/// Configuration parsed from the array metadata, shared by the synchronous and +/// asynchronous codec pipelines (everything except the store, which differs). +pub(crate) struct ArrayConfig { pub(crate) codec_chain: Arc, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, @@ -54,414 +38,55 @@ pub struct CodecPipelineImpl { pub(crate) data_type: DataType, } -impl CodecPipelineImpl { - fn retrieve_chunk_bytes<'a>( - &self, - item: &ChunkItem, - codec_chain: &CodecChain, - codec_options: &CodecOptions, - ) -> PyResult> { - let value_encoded = self.store.get(&item.key).map_py_err::()?; - let value_decoded = if let Some(value_encoded) = value_encoded { - let value_encoded: Vec = value_encoded.into(); // zero-copy in this case - codec_chain - .decode( - value_encoded.into(), - &item.shape, - &self.data_type, - &self.fill_value, - codec_options, - ) - .map_codec_err()? - } else { - ArrayBytes::new_fill_value(&self.data_type, item.num_elements, &self.fill_value) - .map_py_err::()? - }; - Ok(value_decoded) - } - - fn store_chunk_bytes( - &self, - item: &ChunkItem, - codec_chain: &CodecChain, - value_decoded: ArrayBytes, - codec_options: &CodecOptions, - ) -> PyResult<()> { - value_decoded - .validate(item.num_elements, &self.data_type) - .map_codec_err()?; - - if value_decoded.is_fill_value(&self.fill_value) { - self.store.erase(&item.key).map_py_err::() - } else { - let value_encoded = codec_chain - .encode( - value_decoded, - &item.shape, - &self.data_type, - &self.fill_value, - codec_options, - ) - .map(Cow::into_owned) - .map_codec_err()?; - - // Store the encoded chunk - self.store - .set(&item.key, value_encoded.into()) - .map_py_err::() - } - } - - fn store_chunk_subset_bytes( - &self, - item: &ChunkItem, - codec_chain: &CodecChain, - chunk_subset_bytes: ArrayBytes, - codec_options: &CodecOptions, - ) -> PyResult<()> { - let array_shape = &item.shape; - let chunk_subset = &item.chunk_subset; - if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { - return Err(PyErr::new::(format!( - "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" - ))); - } - let data_type_size = self.data_type.size(); - - if is_whole_chunk(item) { - // Fast path if the chunk subset spans the entire chunk, no read required - self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) - } else { - // Validate the chunk subset bytes - chunk_subset_bytes - .validate(chunk_subset.num_elements(), &self.data_type) - .map_codec_err()?; - - // Retrieve the chunk - let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; - - // Update the chunk - let chunk_bytes_new = update_array_bytes( - chunk_bytes_old, - bytemuck::must_cast_slice(array_shape), - chunk_subset, - &chunk_subset_bytes, - data_type_size, - ) - .map_codec_err()?; - - // Store the updated chunk - self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) +pub(crate) fn parse_array_config( + array_metadata: &str, + validate_checksums: bool, + chunk_concurrent_minimum: Option, + chunk_concurrent_maximum: Option, + num_threads: Option, +) -> PyResult { + let metadata = serde_json::from_str(array_metadata).map_py_err::()?; + let metadata_v3 = match &metadata { + ArrayMetadata::V2(v2) => { + Cow::Owned(array_metadata_v2_to_v3(v2).map_py_err::()?) } - } - - fn py_untyped_array_to_array_object<'a>( - value: &'a Bound<'_, PyUntypedArray>, - ) -> &'a PyArrayObject { - // TODO: Upstream a PyUntypedArray.as_array_ref()? - // https://github.com/zarrs/zarrs-python/pull/80/files/75be39184905d688ac04a5f8bca08c5241c458cd#r1918365296 - let array_object_ptr: NonNull = NonNull::new(value.as_array_ptr()) - .expect("bug in numpy crate: Bound<'_, PyUntypedArray>::as_array_ptr unexpectedly returned a null pointer"); - let array_object: &'a PyArrayObject = unsafe { - // SAFETY: the array object pointed to by array_object_ptr is valid for 'a - array_object_ptr.as_ref() - }; - array_object - } - - fn nparray_to_slice<'a>(value: &'a Bound<'_, PyUntypedArray>) -> Result<&'a [u8], PyErr> { - if !value.is_c_contiguous() { - return Err(PyErr::new::( - "input array must be a C contiguous array".to_string(), - )); - } - let array_object: &PyArrayObject = Self::py_untyped_array_to_array_object(value); - let array_data = array_object.data.cast::(); - let array_len = value.len() * value.dtype().itemsize(); - let slice = unsafe { - // SAFETY: array_data is a valid pointer to a u8 array of length array_len - debug_assert!(!array_data.is_null()); - std::slice::from_raw_parts(array_data, array_len) - }; - Ok(slice) - } - - fn nparray_to_unsafe_cell_slice<'a>( - value: &'a Bound<'_, PyUntypedArray>, - ) -> Result, PyErr> { - if !value.is_c_contiguous() { - return Err(PyErr::new::( - "input array must be a C contiguous array".to_string(), - )); - } - let array_object: &PyArrayObject = Self::py_untyped_array_to_array_object(value); - let array_data = array_object.data.cast::(); - let array_len = value.len() * value.dtype().itemsize(); - let output = unsafe { - // SAFETY: array_data is a valid pointer to a u8 array of length array_len - debug_assert!(!array_data.is_null()); - std::slice::from_raw_parts_mut(array_data, array_len) - }; - Ok(UnsafeCellSlice::new(output)) - } -} - -#[gen_stub_pymethods] -#[pymethods] -impl CodecPipelineImpl { - #[pyo3(signature = ( - array_metadata, - store_config, - *, - validate_checksums=false, - chunk_concurrent_minimum=None, - chunk_concurrent_maximum=None, - num_threads=None, - direct_io=false, - ))] - #[new] - fn new( - array_metadata: &str, - mut store_config: StoreConfig, - validate_checksums: bool, - chunk_concurrent_minimum: Option, - chunk_concurrent_maximum: Option, - num_threads: Option, - direct_io: bool, - ) -> PyResult { - store_config.direct_io(direct_io); - let metadata = serde_json::from_str(array_metadata).map_py_err::()?; - let metadata_v3 = match &metadata { - ArrayMetadata::V2(v2) => { - Cow::Owned(array_metadata_v2_to_v3(v2).map_py_err::()?) - } - ArrayMetadata::V3(v3) => Cow::Borrowed(v3), - }; - let codec_chain = - Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); - let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); - - let chunk_concurrent_minimum = - chunk_concurrent_minimum.unwrap_or(global_config().chunk_concurrent_minimum()); - let chunk_concurrent_maximum = - chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); - let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); - - let store: ReadableWritableListableStorage = - (&store_config).try_into().map_py_err::()?; - - let data_type = - DataType::from_metadata(&metadata_v3.data_type).map_py_err::()?; - let fill_value = data_type - .fill_value(&metadata_v3.fill_value, ZarrVersion::V3) - .or_else(|_| { - Err(match &metadata { - ArrayMetadata::V2(metadata) => format!( - "incompatible fill value metadata: dtype={}, fill_value={}", - metadata.dtype, metadata.fill_value - ), - ArrayMetadata::V3(metadata) => format!( - "incompatible fill value metadata: data_type={}, fill_value={}", - metadata.data_type, metadata.fill_value - ), - }) + ArrayMetadata::V3(v3) => Cow::Borrowed(v3), + }; + let codec_chain = + Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); + let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); + + let chunk_concurrent_minimum = + chunk_concurrent_minimum.unwrap_or(global_config().chunk_concurrent_minimum()); + let chunk_concurrent_maximum = chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); + let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); + + let data_type = DataType::from_metadata(&metadata_v3.data_type).map_py_err::()?; + let fill_value = data_type + .fill_value(&metadata_v3.fill_value, ZarrVersion::V3) + .or_else(|_| { + Err(match &metadata { + ArrayMetadata::V2(metadata) => format!( + "incompatible fill value metadata: dtype={}, fill_value={}", + metadata.dtype, metadata.fill_value + ), + ArrayMetadata::V3(metadata) => format!( + "incompatible fill value metadata: data_type={}, fill_value={}", + metadata.data_type, metadata.fill_value + ), }) - .map_py_err::()?; - - Ok(Self { - store, - codec_chain, - codec_options, - chunk_concurrent_minimum, - chunk_concurrent_maximum, - num_threads, - fill_value, - data_type, - }) - } - - fn retrieve_chunks_and_apply_index( - &self, - py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? - value: &Bound<'_, PyUntypedArray>, - ) -> PyResult<()> { - // Get input array - let output = Self::nparray_to_unsafe_cell_slice(value)?; - - // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, codec_options)) = - chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? - else { - return Ok(()); - }; - - // Assemble partial decoders ahead of time and in parallel - let partial_chunk_items = chunk_descriptions - .iter() - .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key.clone()) - .collect::>(); - let mut partial_decoder_cache: HashMap> = - HashMap::new(); - if !partial_chunk_items.is_empty() { - let key_decoder_pairs = - iter_concurrent_limit!(chunk_concurrent_limit, partial_chunk_items, map, |item| { - let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone()); - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder( - Arc::new(input_handle), - &item.shape, - &self.data_type, - &self.fill_value, - &codec_options, - ) - .map_codec_err()?; - Ok((item.key.clone(), partial_decoder)) - }) - .collect::>>()?; - partial_decoder_cache.extend(key_decoder_pairs); - } - - py.detach(move || { - // FIXME: the `decode_into` methods only support fixed length data types. - // For variable length data types, need a codepath with non `_into` methods. - // Collect all the subsets and copy into value on the Python side? - let update_chunk_subset = |item: ChunkItem| { - let mut output_view = unsafe { - // TODO: Is the following correct? - // can we guarantee that when this function is called from Python with arbitrary arguments? - // SAFETY: chunks represent disjoint array subsets - ArrayBytesFixedDisjointView::new( - output, - // TODO: why is data_type in `item`, it should be derived from `output`, no? - self.data_type - .fixed_size() - .ok_or("variable length data type not supported") - .map_py_err::()?, - bytemuck::must_cast_slice(&item.array_shape), - item.subset.clone(), - ) - .map_py_err::()? - }; - let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); - // See zarrs::array::Array::retrieve_chunk_subset_into - if is_whole_chunk(&item) { - // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = - self.store.get(&item.key).map_py_err::()? - { - // Decode the encoded data into the output buffer - let chunk_encoded: Vec = chunk_encoded.into(); - self.codec_chain.decode_into( - Cow::Owned(chunk_encoded), - &item.shape, - &self.data_type, - &self.fill_value, - target, - &codec_options, - ) - } else { - // The chunk is missing, write the fill value - copy_fill_value_into(&self.data_type, &self.fill_value, target) - } - } else { - let key = &item.key; - let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { - PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) - })?; - partial_decoder.partial_decode_into(&item.chunk_subset, target, &codec_options) - } - .map_codec_err() - }; - - iter_concurrent_limit!( - chunk_concurrent_limit, - chunk_descriptions, - try_for_each, - update_chunk_subset - )?; - - Ok(()) - }) - } - - fn store_chunks_with_indices( - &self, - py: Python, - chunk_descriptions: Vec, - value: &Bound<'_, PyUntypedArray>, - write_empty_chunks: bool, - ) -> PyResult<()> { - enum InputValue<'a> { - Array(ArrayBytes<'a>), - Constant(FillValue), - } - - // Get input array - let input_slice = Self::nparray_to_slice(value)?; - let input = if value.ndim() > 0 { - // FIXME: Handle variable length data types, convert value to bytes and offsets - InputValue::Array(ArrayBytes::new_flen(Cow::Borrowed(input_slice))) - } else { - InputValue::Constant(FillValue::new(input_slice.to_vec())) - }; - - // Adjust the concurrency based on the codec chain and the first chunk description - let Some((chunk_concurrent_limit, mut codec_options)) = - chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? - else { - return Ok(()); - }; - codec_options.set_store_empty_chunks(write_empty_chunks); - - py.detach(move || { - let store_chunk = |item: ChunkItem| match &input { - InputValue::Array(input) => { - let chunk_subset_bytes = input - .extract_array_subset( - &item.subset, - bytemuck::must_cast_slice(&item.array_shape), - &self.data_type, - ) - .map_codec_err()?; - self.store_chunk_subset_bytes( - &item, - &self.codec_chain, - chunk_subset_bytes, - &codec_options, - ) - } - InputValue::Constant(constant_value) => { - let chunk_subset_bytes = ArrayBytes::new_fill_value( - &self.data_type, - item.chunk_subset.num_elements(), - constant_value, - ) - .map_py_err::()?; - - self.store_chunk_subset_bytes( - &item, - &self.codec_chain, - chunk_subset_bytes, - &codec_options, - ) - } - }; - - iter_concurrent_limit!( - chunk_concurrent_limit, - chunk_descriptions, - try_for_each, - store_chunk - )?; - - Ok(()) }) - } + .map_py_err::()?; + + Ok(ArrayConfig { + codec_chain, + codec_options, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + fill_value, + data_type, + }) } /// A Python module implemented in Rust. @@ -469,6 +94,7 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) } diff --git a/src/runtime.rs b/src/runtime.rs index 161db085..bab3a0f1 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,9 +1,29 @@ +use std::future::Future; use std::sync::OnceLock; use tokio::runtime::Runtime; use zarrs::storage::storage_adapter::async_to_sync::AsyncToSyncBlockOn; static RUNTIME: OnceLock = OnceLock::new(); +fn runtime() -> &'static Runtime { + RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) +} + +/// Drive `future` to completion on the shared multi-threaded Tokio runtime. +/// +/// This blocks the calling thread until the future resolves and must therefore +/// be called from *outside* the runtime (e.g. a Python worker thread spawned via +/// `asyncio.to_thread`). Calling it from within a Tokio task would panic. +/// +/// Unlike [`tokio::task::spawn`], `block_on` does not require the future to be +/// `'static` or `Send`: the future is polled on the current thread and cannot +/// outlive this call. That is exactly what lets [`crate::async_pipeline`] hand a +/// borrowed (non-`'static`) view of the output `numpy` buffer into the futures +/// that fill it. +pub(crate) fn block_on(future: F) -> F::Output { + runtime().block_on(future) +} + pub struct TokioBlockOn(tokio::runtime::Handle); impl AsyncToSyncBlockOn for TokioBlockOn { @@ -13,6 +33,5 @@ impl AsyncToSyncBlockOn for TokioBlockOn { } pub fn tokio_block_on() -> TokioBlockOn { - let runtime = RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")); - TokioBlockOn(runtime.handle().clone()) + TokioBlockOn(runtime().handle().clone()) } diff --git a/src/store.rs b/src/store.rs index 132b436b..6444f3ac 100644 --- a/src/store.rs +++ b/src/store.rs @@ -8,7 +8,8 @@ use pyo3::{ }; use pyo3_object_store::PyExternalObjectStore; use zarrs::storage::{ - ReadableWritableListableStorage, storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, + AsyncReadableWritableListableStorage, ReadableWritableListableStorage, + storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, }; use crate::{runtime::tokio_block_on, utils::PyErrExt}; @@ -99,6 +100,28 @@ impl TryFrom<&StoreConfig> for ReadableWritableListableStorage { } } +impl TryFrom<&StoreConfig> for AsyncReadableWritableListableStorage { + type Error = PyErr; + + fn try_from(value: &StoreConfig) -> Result { + match value { + // The filesystem store (`zarrs::filesystem::FilesystemStore`) only + // implements the synchronous storage traits, so it cannot back the + // asynchronous pipeline. Object-store and HTTP backends are natively + // asynchronous and are used directly (i.e. *without* the + // `AsyncToSyncStorageAdapter` that the synchronous pipeline wraps + // around them). + StoreConfig::Filesystem(_) => Err(PyErr::new::( + "the asynchronous codec pipeline does not support the filesystem store \ + (zarrs's FilesystemStore is synchronous); use the synchronous pipeline instead" + .to_string(), + )), + StoreConfig::Http(config) => config.try_into(), + StoreConfig::ObStore(config) => config.try_into(), + } + } +} + fn opendal_builder_to_sync_store( builder: B, ) -> PyResult { @@ -109,3 +132,13 @@ fn opendal_builder_to_sync_store( let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on())); Ok(store) } + +fn opendal_builder_to_async_store( + builder: B, +) -> PyResult { + let operator = opendal::Operator::new(builder) + .map_py_err::()? + .finish(); + let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator)); + Ok(store) +} diff --git a/src/store/http.rs b/src/store/http.rs index 04e75f54..ab098492 100644 --- a/src/store/http.rs +++ b/src/store/http.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; use pyo3::{Bound, PyAny, PyErr, PyResult, exceptions::PyValueError}; -use zarrs::storage::ReadableWritableListableStorage; +use zarrs::storage::{AsyncReadableWritableListableStorage, ReadableWritableListableStorage}; -use super::opendal_builder_to_sync_store; +use super::{opendal_builder_to_async_store, opendal_builder_to_sync_store}; #[derive(Debug, Clone)] pub struct HttpStoreConfig { @@ -40,3 +40,12 @@ impl TryInto for &HttpStoreConfig { opendal_builder_to_sync_store(builder) } } + +impl TryInto for &HttpStoreConfig { + type Error = PyErr; + + fn try_into(self) -> Result { + let builder = opendal::services::Http::default().endpoint(&self.endpoint); + opendal_builder_to_async_store(builder) + } +} diff --git a/src/store/obstore.rs b/src/store/obstore.rs index 029b1878..074af548 100644 --- a/src/store/obstore.rs +++ b/src/store/obstore.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use pyo3::PyErr; use zarrs::storage::{ - ReadableWritableListableStorage, storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, + AsyncReadableWritableListableStorage, ReadableWritableListableStorage, + storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, }; use zarrs_object_store::{AsyncObjectStore, object_store::ObjectStore}; @@ -31,3 +32,14 @@ impl TryInto for &ObStoreConfig { Ok(sync_store) } } + +impl TryInto for &ObStoreConfig { + type Error = PyErr; + + fn try_into(self) -> Result { + // `AsyncObjectStore` is natively asynchronous, so it can back the + // asynchronous pipeline directly without the `AsyncToSyncStorageAdapter`. + let async_store = Arc::new(AsyncObjectStore::new(self.store.clone())); + Ok(async_store) + } +} diff --git a/src/sync_pipeline.rs b/src/sync_pipeline.rs new file mode 100644 index 00000000..5627a93c --- /dev/null +++ b/src/sync_pipeline.rs @@ -0,0 +1,461 @@ +//! Synchronous codec pipeline. +//! +//! [`CodecPipelineImpl`] holds a [`ReadableWritableListableStorage`] and drives +//! it with synchronous `get`/`set`/`erase` calls, decoding and encoding chunks +//! in parallel with `rayon`. Its asynchronous counterpart is +//! [`crate::async_pipeline::AsyncCodecPipelineImpl`]. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::ptr::NonNull; +use std::sync::Arc; + +use itertools::Itertools; +use numpy::npyffi::PyArrayObject; +use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; +use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; +use pyo3::prelude::*; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon_iter_concurrent_limit::iter_concurrent_limit; +use unsafe_cell_slice::UnsafeCellSlice; +use zarrs::array::{ + ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayPartialDecoderTraits, + ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, FillValue, StoragePartialDecoder, + copy_fill_value_into, update_array_bytes, +}; +use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; + +use crate::chunk_item::{self, ChunkItem}; +use crate::concurrency::{ChunkConcurrentLimitAndCodecOptions, CodecPipelineConfig}; +use crate::store::StoreConfig; +use crate::utils::{PyCodecErrExt, PyErrExt as _, is_whole_chunk}; +use crate::{ArrayConfig, parse_array_config}; + +// TODO: Use a OnceLock for store with get_or_try_init when stabilised? +#[gen_stub_pyclass] +#[pyclass] +pub struct CodecPipelineImpl { + pub(crate) store: ReadableWritableListableStorage, + pub(crate) codec_chain: Arc, + pub(crate) codec_options: CodecOptions, + pub(crate) chunk_concurrent_minimum: usize, + pub(crate) chunk_concurrent_maximum: usize, + pub(crate) num_threads: usize, + pub(crate) fill_value: FillValue, + pub(crate) data_type: DataType, +} + +impl CodecPipelineConfig for CodecPipelineImpl { + fn codec_chain(&self) -> &CodecChain { + &self.codec_chain + } + fn data_type(&self) -> &DataType { + &self.data_type + } + fn codec_options(&self) -> &CodecOptions { + &self.codec_options + } + fn chunk_concurrent_minimum(&self) -> usize { + self.chunk_concurrent_minimum + } + fn chunk_concurrent_maximum(&self) -> usize { + self.chunk_concurrent_maximum + } + fn num_threads(&self) -> usize { + self.num_threads + } +} + +impl CodecPipelineImpl { + fn retrieve_chunk_bytes<'a>( + &self, + item: &ChunkItem, + codec_chain: &CodecChain, + codec_options: &CodecOptions, + ) -> PyResult> { + let value_encoded = self.store.get(&item.key).map_py_err::()?; + let value_decoded = if let Some(value_encoded) = value_encoded { + let value_encoded: Vec = value_encoded.into(); // zero-copy in this case + codec_chain + .decode( + value_encoded.into(), + &item.shape, + &self.data_type, + &self.fill_value, + codec_options, + ) + .map_codec_err()? + } else { + ArrayBytes::new_fill_value(&self.data_type, item.num_elements, &self.fill_value) + .map_py_err::()? + }; + Ok(value_decoded) + } + + fn store_chunk_bytes( + &self, + item: &ChunkItem, + codec_chain: &CodecChain, + value_decoded: ArrayBytes, + codec_options: &CodecOptions, + ) -> PyResult<()> { + value_decoded + .validate(item.num_elements, &self.data_type) + .map_codec_err()?; + + if value_decoded.is_fill_value(&self.fill_value) { + self.store.erase(&item.key).map_py_err::() + } else { + let value_encoded = codec_chain + .encode( + value_decoded, + &item.shape, + &self.data_type, + &self.fill_value, + codec_options, + ) + .map(Cow::into_owned) + .map_codec_err()?; + + // Store the encoded chunk + self.store + .set(&item.key, value_encoded.into()) + .map_py_err::() + } + } + + fn store_chunk_subset_bytes( + &self, + item: &ChunkItem, + codec_chain: &CodecChain, + chunk_subset_bytes: ArrayBytes, + codec_options: &CodecOptions, + ) -> PyResult<()> { + let array_shape = &item.shape; + let chunk_subset = &item.chunk_subset; + if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { + return Err(PyErr::new::(format!( + "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + ))); + } + let data_type_size = self.data_type.size(); + + if is_whole_chunk(item) { + // Fast path if the chunk subset spans the entire chunk, no read required + self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + } else { + // Validate the chunk subset bytes + chunk_subset_bytes + .validate(chunk_subset.num_elements(), &self.data_type) + .map_codec_err()?; + + // Retrieve the chunk + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; + + // Update the chunk + let chunk_bytes_new = update_array_bytes( + chunk_bytes_old, + bytemuck::must_cast_slice(array_shape), + chunk_subset, + &chunk_subset_bytes, + data_type_size, + ) + .map_codec_err()?; + + // Store the updated chunk + self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + } + } + + fn py_untyped_array_to_array_object<'a>( + value: &'a Bound<'_, PyUntypedArray>, + ) -> &'a PyArrayObject { + // TODO: Upstream a PyUntypedArray.as_array_ref()? + // https://github.com/zarrs/zarrs-python/pull/80/files/75be39184905d688ac04a5f8bca08c5241c458cd#r1918365296 + let array_object_ptr: NonNull = NonNull::new(value.as_array_ptr()) + .expect("bug in numpy crate: Bound<'_, PyUntypedArray>::as_array_ptr unexpectedly returned a null pointer"); + let array_object: &'a PyArrayObject = unsafe { + // SAFETY: the array object pointed to by array_object_ptr is valid for 'a + array_object_ptr.as_ref() + }; + array_object + } + + pub(crate) fn nparray_to_slice<'a>( + value: &'a Bound<'_, PyUntypedArray>, + ) -> Result<&'a [u8], PyErr> { + if !value.is_c_contiguous() { + return Err(PyErr::new::( + "input array must be a C contiguous array".to_string(), + )); + } + let array_object: &PyArrayObject = Self::py_untyped_array_to_array_object(value); + let array_data = array_object.data.cast::(); + let array_len = value.len() * value.dtype().itemsize(); + let slice = unsafe { + // SAFETY: array_data is a valid pointer to a u8 array of length array_len + debug_assert!(!array_data.is_null()); + std::slice::from_raw_parts(array_data, array_len) + }; + Ok(slice) + } + + pub(crate) fn nparray_to_unsafe_cell_slice<'a>( + value: &'a Bound<'_, PyUntypedArray>, + ) -> Result, PyErr> { + if !value.is_c_contiguous() { + return Err(PyErr::new::( + "input array must be a C contiguous array".to_string(), + )); + } + let array_object: &PyArrayObject = Self::py_untyped_array_to_array_object(value); + let array_data = array_object.data.cast::(); + let array_len = value.len() * value.dtype().itemsize(); + let output = unsafe { + // SAFETY: array_data is a valid pointer to a u8 array of length array_len + debug_assert!(!array_data.is_null()); + std::slice::from_raw_parts_mut(array_data, array_len) + }; + Ok(UnsafeCellSlice::new(output)) + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl CodecPipelineImpl { + #[pyo3(signature = ( + array_metadata, + store_config, + *, + validate_checksums=false, + chunk_concurrent_minimum=None, + chunk_concurrent_maximum=None, + num_threads=None, + direct_io=false, + ))] + #[new] + fn new( + array_metadata: &str, + mut store_config: StoreConfig, + validate_checksums: bool, + chunk_concurrent_minimum: Option, + chunk_concurrent_maximum: Option, + num_threads: Option, + direct_io: bool, + ) -> PyResult { + store_config.direct_io(direct_io); + let ArrayConfig { + codec_chain, + codec_options, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + fill_value, + data_type, + } = parse_array_config( + array_metadata, + validate_checksums, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + )?; + + let store: ReadableWritableListableStorage = + (&store_config).try_into().map_py_err::()?; + + Ok(Self { + store, + codec_chain, + codec_options, + chunk_concurrent_minimum, + chunk_concurrent_maximum, + num_threads, + fill_value, + data_type, + }) + } + + fn retrieve_chunks_and_apply_index( + &self, + py: Python, + chunk_descriptions: Vec, // FIXME: Ref / iterable? + value: &Bound<'_, PyUntypedArray>, + ) -> PyResult<()> { + // Get input array + let output = Self::nparray_to_unsafe_cell_slice(value)?; + + // Adjust the concurrency based on the codec chain and the first chunk description + let Some((chunk_concurrent_limit, codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + else { + return Ok(()); + }; + + // Assemble partial decoders ahead of time and in parallel + let partial_chunk_items = chunk_descriptions + .iter() + .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key.clone()) + .collect::>(); + let mut partial_decoder_cache: HashMap> = + HashMap::new(); + if !partial_chunk_items.is_empty() { + let key_decoder_pairs = + iter_concurrent_limit!(chunk_concurrent_limit, partial_chunk_items, map, |item| { + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = StoragePartialDecoder::new(storage_handle, item.key.clone()); + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + &item.shape, + &self.data_type, + &self.fill_value, + &codec_options, + ) + .map_codec_err()?; + Ok((item.key.clone(), partial_decoder)) + }) + .collect::>>()?; + partial_decoder_cache.extend(key_decoder_pairs); + } + + py.detach(move || { + // FIXME: the `decode_into` methods only support fixed length data types. + // For variable length data types, need a codepath with non `_into` methods. + // Collect all the subsets and copy into value on the Python side? + let update_chunk_subset = |item: ChunkItem| { + let mut output_view = unsafe { + // TODO: Is the following correct? + // can we guarantee that when this function is called from Python with arbitrary arguments? + // SAFETY: chunks represent disjoint array subsets + ArrayBytesFixedDisjointView::new( + output, + // TODO: why is data_type in `item`, it should be derived from `output`, no? + self.data_type + .fixed_size() + .ok_or("variable length data type not supported") + .map_py_err::()?, + bytemuck::must_cast_slice(&item.array_shape), + item.subset.clone(), + ) + .map_py_err::()? + }; + let target = ArrayBytesDecodeIntoTarget::Fixed(&mut output_view); + // See zarrs::array::Array::retrieve_chunk_subset_into + if is_whole_chunk(&item) { + // See zarrs::array::Array::retrieve_chunk_into + if let Some(chunk_encoded) = + self.store.get(&item.key).map_py_err::()? + { + // Decode the encoded data into the output buffer + let chunk_encoded: Vec = chunk_encoded.into(); + self.codec_chain.decode_into( + Cow::Owned(chunk_encoded), + &item.shape, + &self.data_type, + &self.fill_value, + target, + &codec_options, + ) + } else { + // The chunk is missing, write the fill value + copy_fill_value_into(&self.data_type, &self.fill_value, target) + } + } else { + let key = &item.key; + let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { + PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) + })?; + partial_decoder.partial_decode_into(&item.chunk_subset, target, &codec_options) + } + .map_codec_err() + }; + + iter_concurrent_limit!( + chunk_concurrent_limit, + chunk_descriptions, + try_for_each, + update_chunk_subset + )?; + + Ok(()) + }) + } + + fn store_chunks_with_indices( + &self, + py: Python, + chunk_descriptions: Vec, + value: &Bound<'_, PyUntypedArray>, + write_empty_chunks: bool, + ) -> PyResult<()> { + enum InputValue<'a> { + Array(ArrayBytes<'a>), + Constant(FillValue), + } + + // Get input array + let input_slice = Self::nparray_to_slice(value)?; + let input = if value.ndim() > 0 { + // FIXME: Handle variable length data types, convert value to bytes and offsets + InputValue::Array(ArrayBytes::new_flen(Cow::Borrowed(input_slice))) + } else { + InputValue::Constant(FillValue::new(input_slice.to_vec())) + }; + + // Adjust the concurrency based on the codec chain and the first chunk description + let Some((chunk_concurrent_limit, mut codec_options)) = + chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? + else { + return Ok(()); + }; + codec_options.set_store_empty_chunks(write_empty_chunks); + + py.detach(move || { + let store_chunk = |item: ChunkItem| match &input { + InputValue::Array(input) => { + let chunk_subset_bytes = input + .extract_array_subset( + &item.subset, + bytemuck::must_cast_slice(&item.array_shape), + &self.data_type, + ) + .map_codec_err()?; + self.store_chunk_subset_bytes( + &item, + &self.codec_chain, + chunk_subset_bytes, + &codec_options, + ) + } + InputValue::Constant(constant_value) => { + let chunk_subset_bytes = ArrayBytes::new_fill_value( + &self.data_type, + item.chunk_subset.num_elements(), + constant_value, + ) + .map_py_err::()?; + + self.store_chunk_subset_bytes( + &item, + &self.codec_chain, + chunk_subset_bytes, + &codec_options, + ) + } + }; + + iter_concurrent_limit!( + chunk_concurrent_limit, + chunk_descriptions, + try_for_each, + store_chunk + )?; + + Ok(()) + }) + } +} diff --git a/src/utils.rs b/src/utils.rs index 3ededf06..97d5d0fb 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use pyo3::{PyErr, PyResult, PyTypeInfo}; use zarrs::array::CodecError; -use crate::ChunkItem; +use crate::chunk_item::ChunkItem; pub(crate) trait PyErrExt { fn map_py_err(self) -> PyResult;