From f77ee39190c6f1569813035c86958395efd47933 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jun 2026 16:40:37 -0400 Subject: [PATCH 1/5] Add store_chunk --- src/array/sync.rs | 19 +++++++++++++++++ src/array_bytes.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/src/array/sync.rs b/src/array/sync.rs index c0fe7e8..ac0c47e 100644 --- a/src/array/sync.rs +++ b/src/array/sync.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::array::selection::PySelection; use crate::array::shared::array_metadata_accessors; use crate::array::PyChunkIndices; +use crate::array_bytes::PyArrayBytes; use crate::codec::PyCodecOptions; use crate::decoded_array::DecodedArray; use crate::error::ZarristaResult; @@ -90,6 +91,24 @@ impl PyArray { let encoded = self.inner.retrieve_encoded_chunk(chunk_indices.as_ref())?; Ok(encoded.map(|buf| PyBytes::new(buf.into()))) } + + #[pyo3(signature = (chunk_indices, encoded_chunk, **codec_options))] + fn store_chunk( + &self, + chunk_indices: PyChunkIndices, + encoded_chunk: &PyArrayBytes, + codec_options: Option, + ) -> ZarristaResult<()> { + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + self.inner.store_chunk_opt( + chunk_indices.as_ref(), + encoded_chunk.inner(), + &codec_options, + )?; + Ok(()) + } } impl From> for PyArray { diff --git a/src/array_bytes.rs b/src/array_bytes.rs index feaedf5..8ce61b3 100644 --- a/src/array_bytes.rs +++ b/src/array_bytes.rs @@ -11,7 +11,10 @@ use std::borrow::Cow; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_bytes::PyBytes; -use zarrs::array::{ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength}; +use zarrs::array::{ + ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength, DataType, + ElementError, IntoArrayBytes, +}; /// Bytes for a chunk as they cross the Python boundary. #[pyclass(module = "zarrista", frozen, name = "ArrayBytes")] @@ -68,6 +71,14 @@ impl PyArrayBytes { } impl PyArrayBytes { + pub fn into_inner(self) -> ArrayBytesOwned { + self.0 + } + + pub fn inner(&self) -> &ArrayBytesOwned { + &self.0 + } + /// Borrow a zarrs [`ArrayBytes`] out of `self` for the duration of a codec /// call. Zero-copy: every buffer is borrowed from the owned `PyBytes`. /// @@ -85,13 +96,25 @@ impl PyArrayBytes { } } +impl From for ArrayBytesOwned { + fn from(py_bytes: PyArrayBytes) -> Self { + py_bytes.0 + } +} + +impl From for PyArrayBytes { + fn from(bytes: ArrayBytesOwned) -> Self { + Self(bytes) + } +} + /// The owned representation, mirroring zarrs' [`ArrayBytes`] sum type. /// /// - Element buffers (fixed/variable payloads and the optional mask) are stored /// as [`PyBytes`] and borrowed zero-copy. /// - Offsets are copied into a `Vec` (they need a per-element cast from /// Python ints and are tiny metadata relative to the payload). -enum ArrayBytesOwned { +pub enum ArrayBytesOwned { Fixed(PyBytes), Variable { bytes: PyBytes, @@ -172,6 +195,30 @@ impl From> for ArrayBytesOwned { } } +impl<'a> IntoArrayBytes<'a> for &'a ArrayBytesOwned { + fn into_array_bytes(self, data_type: &DataType) -> Result, ElementError> { + match self { + ArrayBytesOwned::Fixed(bytes) => Ok(ArrayBytes::Fixed(Cow::Borrowed(bytes.as_ref()))), + ArrayBytesOwned::Variable { bytes, offsets } => { + let offsets = ArrayBytesOffsets::new(Cow::Borrowed(offsets.as_slice())) + .map_err(|e| ElementError::Other(e.to_string()))?; + + let variable = + ArrayBytesVariableLength::new(Cow::Borrowed(bytes.as_ref()), offsets) + .map_err(|e| ElementError::Other(e.to_string()))?; + Ok(ArrayBytes::Variable(variable)) + } + ArrayBytesOwned::Optional { data, mask } => { + let data = data.into_array_bytes(data_type)?; + Ok(ArrayBytes::Optional(ArrayBytesOptional::new( + data, + Cow::Borrowed(mask.as_ref()), + ))) + } + } + } +} + /// Wrap a `Cow<[u8]>` as `PyBytes`, moving the allocation when already owned. /// /// A `Cow::Borrowed` is copied once into a fresh allocation. From e87e9721127c3b62c800240916fcd9384348d693 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jun 2026 16:51:52 -0400 Subject: [PATCH 2/5] implement store_chunk --- src/array/async.rs | 27 +++++++++++++++++++++++++++ src/array_bytes.rs | 7 ++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/array/async.rs b/src/array/async.rs index c345dcb..ee1e5fc 100644 --- a/src/array/async.rs +++ b/src/array/async.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::array::selection::PySelection; use crate::array::shared::array_metadata_accessors; use crate::array::PyChunkIndices; +use crate::array_bytes::PyArrayBytes; use crate::codec::PyCodecOptions; use crate::decoded_array::DecodedArray; use crate::error::ZarristaError; @@ -128,6 +129,32 @@ impl PyAsyncArray { Ok(encoded.map(PyBytes::new)) }) } + + #[pyo3(signature = (chunk_indices, encoded_chunk, **codec_options))] + fn store_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + encoded_chunk: PyArrayBytes, + codec_options: Option, + ) -> PyResult> { + let inner = self.inner.clone(); + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + + future_into_py(py, async move { + inner + .async_store_chunk_opt( + chunk_indices.as_ref(), + encoded_chunk.inner(), + &codec_options, + ) + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } } impl From> for PyAsyncArray { diff --git a/src/array_bytes.rs b/src/array_bytes.rs index 8ce61b3..55fb9b4 100644 --- a/src/array_bytes.rs +++ b/src/array_bytes.rs @@ -17,7 +17,8 @@ use zarrs::array::{ }; /// Bytes for a chunk as they cross the Python boundary. -#[pyclass(module = "zarrista", frozen, name = "ArrayBytes")] +#[derive(Clone)] +#[pyclass(module = "zarrista", frozen, name = "ArrayBytes", from_py_object)] pub struct PyArrayBytes(ArrayBytesOwned); #[pymethods] @@ -114,10 +115,14 @@ impl From for PyArrayBytes { /// as [`PyBytes`] and borrowed zero-copy. /// - Offsets are copied into a `Vec` (they need a per-element cast from /// Python ints and are tiny metadata relative to the payload). +#[derive(Clone)] pub enum ArrayBytesOwned { Fixed(PyBytes), Variable { bytes: PyBytes, + // Note: Cloning actually memcpy's the offsets; to avoid that would require Arc<[usize]>, + // but then it's not zero-copy to create from a Vec + // This is currently mostly hit on the async store_chunk path offsets: Vec, }, Optional { From 141f86cdae232ed31f4bf6d45d86bf00c530d679 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jun 2026 16:54:25 -0400 Subject: [PATCH 3/5] clean up --- src/array/async.rs | 2 +- src/array/sync.rs | 2 +- src/array_bytes.rs | 37 +------------------------------------ 3 files changed, 3 insertions(+), 38 deletions(-) diff --git a/src/array/async.rs b/src/array/async.rs index ee1e5fc..d4c07d8 100644 --- a/src/array/async.rs +++ b/src/array/async.rs @@ -147,7 +147,7 @@ impl PyAsyncArray { inner .async_store_chunk_opt( chunk_indices.as_ref(), - encoded_chunk.inner(), + encoded_chunk.as_array_bytes()?, &codec_options, ) .await diff --git a/src/array/sync.rs b/src/array/sync.rs index ac0c47e..1366b4f 100644 --- a/src/array/sync.rs +++ b/src/array/sync.rs @@ -104,7 +104,7 @@ impl PyArray { .unwrap_or_default(); self.inner.store_chunk_opt( chunk_indices.as_ref(), - encoded_chunk.inner(), + encoded_chunk.as_array_bytes()?, &codec_options, )?; Ok(()) diff --git a/src/array_bytes.rs b/src/array_bytes.rs index 55fb9b4..d8f475f 100644 --- a/src/array_bytes.rs +++ b/src/array_bytes.rs @@ -11,10 +11,7 @@ use std::borrow::Cow; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_bytes::PyBytes; -use zarrs::array::{ - ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength, DataType, - ElementError, IntoArrayBytes, -}; +use zarrs::array::{ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength}; /// Bytes for a chunk as they cross the Python boundary. #[derive(Clone)] @@ -72,14 +69,6 @@ impl PyArrayBytes { } impl PyArrayBytes { - pub fn into_inner(self) -> ArrayBytesOwned { - self.0 - } - - pub fn inner(&self) -> &ArrayBytesOwned { - &self.0 - } - /// Borrow a zarrs [`ArrayBytes`] out of `self` for the duration of a codec /// call. Zero-copy: every buffer is borrowed from the owned `PyBytes`. /// @@ -200,30 +189,6 @@ impl From> for ArrayBytesOwned { } } -impl<'a> IntoArrayBytes<'a> for &'a ArrayBytesOwned { - fn into_array_bytes(self, data_type: &DataType) -> Result, ElementError> { - match self { - ArrayBytesOwned::Fixed(bytes) => Ok(ArrayBytes::Fixed(Cow::Borrowed(bytes.as_ref()))), - ArrayBytesOwned::Variable { bytes, offsets } => { - let offsets = ArrayBytesOffsets::new(Cow::Borrowed(offsets.as_slice())) - .map_err(|e| ElementError::Other(e.to_string()))?; - - let variable = - ArrayBytesVariableLength::new(Cow::Borrowed(bytes.as_ref()), offsets) - .map_err(|e| ElementError::Other(e.to_string()))?; - Ok(ArrayBytes::Variable(variable)) - } - ArrayBytesOwned::Optional { data, mask } => { - let data = data.into_array_bytes(data_type)?; - Ok(ArrayBytes::Optional(ArrayBytesOptional::new( - data, - Cow::Borrowed(mask.as_ref()), - ))) - } - } - } -} - /// Wrap a `Cow<[u8]>` as `PyBytes`, moving the allocation when already owned. /// /// A `Cow::Borrowed` is copied once into a fresh allocation. From e71c0ff376d8ec3c5f4c41ad7d88528c80fa8054 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jun 2026 17:10:55 -0400 Subject: [PATCH 4/5] add more methods --- src/array/async.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++ src/array/sync.rs | 38 ++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/src/array/async.rs b/src/array/async.rs index d4c07d8..19232cf 100644 --- a/src/array/async.rs +++ b/src/array/async.rs @@ -76,6 +76,52 @@ impl PyAsyncArray { }) } + fn compact_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + codec_options: Option, + ) -> PyResult> { + let inner = self.inner.clone(); + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + + future_into_py(py, async move { + let result = inner + .async_compact_chunk(chunk_indices.as_ref(), &codec_options) + .await + .map_err(ZarristaError::from)?; + Ok(result) + }) + } + + fn erase_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + ) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .async_erase_chunk(chunk_indices.as_ref()) + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } + + fn erase_metadata<'py>(&self, py: Python<'py>) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .async_erase_metadata() + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } + /// Read a region of the array as `Data`, using numpy-style basic indexing. fn retrieve_array_subset<'py>( &self, @@ -155,6 +201,26 @@ impl PyAsyncArray { Ok(()) }) } + + fn store_encoded_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + encoded_chunk: PyBytes, + ) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + // Safety: + // The responsibility is on the caller to ensure the chunk is encoded correctly + unsafe { + inner + .async_store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner()) + .await + .map_err(ZarristaError::from)?; + } + Ok(()) + }) + } } impl From> for PyAsyncArray { diff --git a/src/array/sync.rs b/src/array/sync.rs index 1366b4f..d5f3fc1 100644 --- a/src/array/sync.rs +++ b/src/array/sync.rs @@ -61,6 +61,30 @@ impl PyArray { Ok(Self::new(Arc::new(inner))) } + #[pyo3(signature = (chunk_indices, **codec_options))] + fn compact_chunk( + &self, + chunk_indices: PyChunkIndices, + codec_options: Option, + ) -> ZarristaResult { + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + Ok(self + .inner + .compact_chunk(chunk_indices.as_ref(), &codec_options)?) + } + + fn erase_chunk(&self, chunk_indices: PyChunkIndices) -> ZarristaResult<()> { + self.inner.erase_chunk(chunk_indices.as_ref())?; + Ok(()) + } + + fn erase_metadata(&self) -> ZarristaResult<()> { + self.inner.erase_metadata()?; + Ok(()) + } + /// Read a region of the array, using numpy-style basic indexing. /// /// Returns one of the decoded result classes (`Tensor`, `VariableArray`, @@ -109,6 +133,20 @@ impl PyArray { )?; Ok(()) } + + fn store_encoded_chunk( + &self, + chunk_indices: PyChunkIndices, + encoded_chunk: PyBytes, + ) -> ZarristaResult<()> { + // Safety: + // The responsibility is on the caller to ensure the chunk is encoded correctly + unsafe { + self.inner + .store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner())?; + } + Ok(()) + } } impl From> for PyArray { From 627edf0761f4ee208fd288d72eea530cd9b6eab3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jun 2026 17:17:54 -0400 Subject: [PATCH 5/5] update type hinting --- python/zarrista/_array.pyi | 96 +++++++++++++++++++++++++++++++++++++- src/array/async.rs | 7 +-- src/array/sync.rs | 6 +-- 3 files changed, 101 insertions(+), 8 deletions(-) diff --git a/python/zarrista/_array.pyi b/python/zarrista/_array.pyi index 5b6036d..25257a4 100644 --- a/python/zarrista/_array.pyi +++ b/python/zarrista/_array.pyi @@ -1,3 +1,4 @@ +from collections.abc import Buffer from types import EllipsisType from typing import TypeAlias, Unpack @@ -10,6 +11,7 @@ from zarrista.codec import ( CodecOptions, ) +from ._array_bytes import ArrayBytes from ._chunks import ChunkGrid from ._decoded_array import DecodedArray from ._dtype import DataType @@ -26,7 +28,7 @@ indexing are not supported. """ class Array: - """A read-only Zarr array.""" + """A Zarr array.""" @staticmethod def open(store: FilesystemStore | MemoryStore, path: str = "/") -> Array: @@ -82,6 +84,51 @@ class Array: Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. """ + def store_chunk( + self, + chunk_indices: list[int], + decoded_chunk: ArrayBytes, + **codec_options: Unpack[CodecOptions], + ) -> None: + """Encode `decoded_chunk` and write it as the chunk at `chunk_indices`. + + `decoded_chunk` holds the decoded chunk data; the array's codec pipeline + encodes it before it is written. If the data equals the fill value and + `store_empty_chunks` is `False`, the chunk is erased instead. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + def store_encoded_chunk( + self, + chunk_indices: list[int], + encoded_chunk: Buffer, + ) -> None: + """Write already-encoded bytes directly as the chunk at `chunk_indices`. + + The bytes are stored verbatim with no encoding. The caller is + responsible for ensuring they match the array's codec pipeline; invalid + bytes produce a chunk that cannot be decoded. + """ + def compact_chunk( + self, + chunk_indices: list[int], + **codec_options: Unpack[CodecOptions], + ) -> bool: + """Re-encode the stored chunk in place, returning whether it was rewritten. + + Reads the encoded chunk, attempts to produce a more compact encoding, + and rewrites it if that succeeds. Returns `True` if the chunk was + rewritten, `False` if it was absent or already optimal. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + def erase_chunk(self, chunk_indices: list[int]) -> None: + """Delete the chunk at `chunk_indices` from the store. + + Erasing an absent chunk is a no-op. + """ + def erase_metadata(self) -> None: + """Delete the array's metadata from the store.""" @property def shape(self) -> list[int]: """The array shape.""" @@ -92,7 +139,7 @@ class Array: """ class AsyncArray: - """A read-only Zarr array backed by an async store.""" + """A Zarr array backed by an async store.""" @staticmethod async def open_async(store: AsyncStore, path: str = "/") -> AsyncArray: @@ -151,6 +198,51 @@ class AsyncArray: Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. """ + async def store_chunk( + self, + chunk_indices: list[int], + decoded_chunk: ArrayBytes, + **codec_options: Unpack[CodecOptions], + ) -> None: + """Encode `decoded_chunk` and write it as the chunk at `chunk_indices`. + + `decoded_chunk` holds the decoded chunk data; the array's codec pipeline + encodes it before it is written. If the data equals the fill value and + `store_empty_chunks` is `False`, the chunk is erased instead. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + async def store_encoded_chunk( + self, + chunk_indices: list[int], + encoded_chunk: Buffer, + ) -> None: + """Write already-encoded bytes directly as the chunk at `chunk_indices`. + + The bytes are stored verbatim with no encoding. The caller is + responsible for ensuring they match the array's codec pipeline; invalid + bytes produce a chunk that cannot be decoded. + """ + async def compact_chunk( + self, + chunk_indices: list[int], + **codec_options: Unpack[CodecOptions], + ) -> bool: + """Re-encode the stored chunk in place, returning whether it was rewritten. + + Reads the encoded chunk, attempts to produce a more compact encoding, + and rewrites it if that succeeds. Returns `True` if the chunk was + rewritten, `False` if it was absent or already optimal. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + async def erase_chunk(self, chunk_indices: list[int]) -> None: + """Delete the chunk at `chunk_indices` from the store. + + Erasing an absent chunk is a no-op. + """ + async def erase_metadata(self) -> None: + """Delete the array's metadata from the store.""" @property def shape(self) -> list[int]: """The array shape.""" diff --git a/src/array/async.rs b/src/array/async.rs index 19232cf..ba418f2 100644 --- a/src/array/async.rs +++ b/src/array/async.rs @@ -76,6 +76,7 @@ impl PyAsyncArray { }) } + #[pyo3(signature = (chunk_indices, **codec_options))] fn compact_chunk<'py>( &self, py: Python<'py>, @@ -176,12 +177,12 @@ impl PyAsyncArray { }) } - #[pyo3(signature = (chunk_indices, encoded_chunk, **codec_options))] + #[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))] fn store_chunk<'py>( &self, py: Python<'py>, chunk_indices: PyChunkIndices, - encoded_chunk: PyArrayBytes, + decoded_chunk: PyArrayBytes, codec_options: Option, ) -> PyResult> { let inner = self.inner.clone(); @@ -193,7 +194,7 @@ impl PyAsyncArray { inner .async_store_chunk_opt( chunk_indices.as_ref(), - encoded_chunk.as_array_bytes()?, + decoded_chunk.as_array_bytes()?, &codec_options, ) .await diff --git a/src/array/sync.rs b/src/array/sync.rs index d5f3fc1..38bcc96 100644 --- a/src/array/sync.rs +++ b/src/array/sync.rs @@ -116,11 +116,11 @@ impl PyArray { Ok(encoded.map(|buf| PyBytes::new(buf.into()))) } - #[pyo3(signature = (chunk_indices, encoded_chunk, **codec_options))] + #[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))] fn store_chunk( &self, chunk_indices: PyChunkIndices, - encoded_chunk: &PyArrayBytes, + decoded_chunk: &PyArrayBytes, codec_options: Option, ) -> ZarristaResult<()> { let codec_options = codec_options @@ -128,7 +128,7 @@ impl PyArray { .unwrap_or_default(); self.inner.store_chunk_opt( chunk_indices.as_ref(), - encoded_chunk.as_array_bytes()?, + decoded_chunk.as_array_bytes()?, &codec_options, )?; Ok(())