diff --git a/python/zarrista/_array.pyi b/python/zarrista/_array.pyi index 5b6036d..25257a4 100644 --- a/python/zarrista/_array.pyi +++ b/python/zarrista/_array.pyi @@ -1,3 +1,4 @@ +from collections.abc import Buffer from types import EllipsisType from typing import TypeAlias, Unpack @@ -10,6 +11,7 @@ from zarrista.codec import ( CodecOptions, ) +from ._array_bytes import ArrayBytes from ._chunks import ChunkGrid from ._decoded_array import DecodedArray from ._dtype import DataType @@ -26,7 +28,7 @@ indexing are not supported. """ class Array: - """A read-only Zarr array.""" + """A Zarr array.""" @staticmethod def open(store: FilesystemStore | MemoryStore, path: str = "/") -> Array: @@ -82,6 +84,51 @@ class Array: Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. """ + def store_chunk( + self, + chunk_indices: list[int], + decoded_chunk: ArrayBytes, + **codec_options: Unpack[CodecOptions], + ) -> None: + """Encode `decoded_chunk` and write it as the chunk at `chunk_indices`. + + `decoded_chunk` holds the decoded chunk data; the array's codec pipeline + encodes it before it is written. If the data equals the fill value and + `store_empty_chunks` is `False`, the chunk is erased instead. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + def store_encoded_chunk( + self, + chunk_indices: list[int], + encoded_chunk: Buffer, + ) -> None: + """Write already-encoded bytes directly as the chunk at `chunk_indices`. + + The bytes are stored verbatim with no encoding. The caller is + responsible for ensuring they match the array's codec pipeline; invalid + bytes produce a chunk that cannot be decoded. + """ + def compact_chunk( + self, + chunk_indices: list[int], + **codec_options: Unpack[CodecOptions], + ) -> bool: + """Re-encode the stored chunk in place, returning whether it was rewritten. + + Reads the encoded chunk, attempts to produce a more compact encoding, + and rewrites it if that succeeds. Returns `True` if the chunk was + rewritten, `False` if it was absent or already optimal. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + def erase_chunk(self, chunk_indices: list[int]) -> None: + """Delete the chunk at `chunk_indices` from the store. + + Erasing an absent chunk is a no-op. + """ + def erase_metadata(self) -> None: + """Delete the array's metadata from the store.""" @property def shape(self) -> list[int]: """The array shape.""" @@ -92,7 +139,7 @@ class Array: """ class AsyncArray: - """A read-only Zarr array backed by an async store.""" + """A Zarr array backed by an async store.""" @staticmethod async def open_async(store: AsyncStore, path: str = "/") -> AsyncArray: @@ -151,6 +198,51 @@ class AsyncArray: Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. """ + async def store_chunk( + self, + chunk_indices: list[int], + decoded_chunk: ArrayBytes, + **codec_options: Unpack[CodecOptions], + ) -> None: + """Encode `decoded_chunk` and write it as the chunk at `chunk_indices`. + + `decoded_chunk` holds the decoded chunk data; the array's codec pipeline + encodes it before it is written. If the data equals the fill value and + `store_empty_chunks` is `False`, the chunk is erased instead. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + async def store_encoded_chunk( + self, + chunk_indices: list[int], + encoded_chunk: Buffer, + ) -> None: + """Write already-encoded bytes directly as the chunk at `chunk_indices`. + + The bytes are stored verbatim with no encoding. The caller is + responsible for ensuring they match the array's codec pipeline; invalid + bytes produce a chunk that cannot be decoded. + """ + async def compact_chunk( + self, + chunk_indices: list[int], + **codec_options: Unpack[CodecOptions], + ) -> bool: + """Re-encode the stored chunk in place, returning whether it was rewritten. + + Reads the encoded chunk, attempts to produce a more compact encoding, + and rewrites it if that succeeds. Returns `True` if the chunk was + rewritten, `False` if it was absent or already optimal. + + Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions]. + """ + async def erase_chunk(self, chunk_indices: list[int]) -> None: + """Delete the chunk at `chunk_indices` from the store. + + Erasing an absent chunk is a no-op. + """ + async def erase_metadata(self) -> None: + """Delete the array's metadata from the store.""" @property def shape(self) -> list[int]: """The array shape.""" diff --git a/src/array/async.rs b/src/array/async.rs index c345dcb..ba418f2 100644 --- a/src/array/async.rs +++ b/src/array/async.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::array::selection::PySelection; use crate::array::shared::array_metadata_accessors; use crate::array::PyChunkIndices; +use crate::array_bytes::PyArrayBytes; use crate::codec::PyCodecOptions; use crate::decoded_array::DecodedArray; use crate::error::ZarristaError; @@ -75,6 +76,53 @@ impl PyAsyncArray { }) } + #[pyo3(signature = (chunk_indices, **codec_options))] + fn compact_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + codec_options: Option, + ) -> PyResult> { + let inner = self.inner.clone(); + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + + future_into_py(py, async move { + let result = inner + .async_compact_chunk(chunk_indices.as_ref(), &codec_options) + .await + .map_err(ZarristaError::from)?; + Ok(result) + }) + } + + fn erase_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + ) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .async_erase_chunk(chunk_indices.as_ref()) + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } + + fn erase_metadata<'py>(&self, py: Python<'py>) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .async_erase_metadata() + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } + /// Read a region of the array as `Data`, using numpy-style basic indexing. fn retrieve_array_subset<'py>( &self, @@ -128,6 +176,52 @@ impl PyAsyncArray { Ok(encoded.map(PyBytes::new)) }) } + + #[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))] + fn store_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + decoded_chunk: PyArrayBytes, + codec_options: Option, + ) -> PyResult> { + let inner = self.inner.clone(); + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + + future_into_py(py, async move { + inner + .async_store_chunk_opt( + chunk_indices.as_ref(), + decoded_chunk.as_array_bytes()?, + &codec_options, + ) + .await + .map_err(ZarristaError::from)?; + Ok(()) + }) + } + + fn store_encoded_chunk<'py>( + &self, + py: Python<'py>, + chunk_indices: PyChunkIndices, + encoded_chunk: PyBytes, + ) -> PyResult> { + let inner = self.inner.clone(); + future_into_py(py, async move { + // Safety: + // The responsibility is on the caller to ensure the chunk is encoded correctly + unsafe { + inner + .async_store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner()) + .await + .map_err(ZarristaError::from)?; + } + Ok(()) + }) + } } impl From> for PyAsyncArray { diff --git a/src/array/sync.rs b/src/array/sync.rs index c0fe7e8..38bcc96 100644 --- a/src/array/sync.rs +++ b/src/array/sync.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::array::selection::PySelection; use crate::array::shared::array_metadata_accessors; use crate::array::PyChunkIndices; +use crate::array_bytes::PyArrayBytes; use crate::codec::PyCodecOptions; use crate::decoded_array::DecodedArray; use crate::error::ZarristaResult; @@ -60,6 +61,30 @@ impl PyArray { Ok(Self::new(Arc::new(inner))) } + #[pyo3(signature = (chunk_indices, **codec_options))] + fn compact_chunk( + &self, + chunk_indices: PyChunkIndices, + codec_options: Option, + ) -> ZarristaResult { + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + Ok(self + .inner + .compact_chunk(chunk_indices.as_ref(), &codec_options)?) + } + + fn erase_chunk(&self, chunk_indices: PyChunkIndices) -> ZarristaResult<()> { + self.inner.erase_chunk(chunk_indices.as_ref())?; + Ok(()) + } + + fn erase_metadata(&self) -> ZarristaResult<()> { + self.inner.erase_metadata()?; + Ok(()) + } + /// Read a region of the array, using numpy-style basic indexing. /// /// Returns one of the decoded result classes (`Tensor`, `VariableArray`, @@ -90,6 +115,38 @@ impl PyArray { let encoded = self.inner.retrieve_encoded_chunk(chunk_indices.as_ref())?; Ok(encoded.map(|buf| PyBytes::new(buf.into()))) } + + #[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))] + fn store_chunk( + &self, + chunk_indices: PyChunkIndices, + decoded_chunk: &PyArrayBytes, + codec_options: Option, + ) -> ZarristaResult<()> { + let codec_options = codec_options + .map(|opts| opts.into_inner()) + .unwrap_or_default(); + self.inner.store_chunk_opt( + chunk_indices.as_ref(), + decoded_chunk.as_array_bytes()?, + &codec_options, + )?; + Ok(()) + } + + fn store_encoded_chunk( + &self, + chunk_indices: PyChunkIndices, + encoded_chunk: PyBytes, + ) -> ZarristaResult<()> { + // Safety: + // The responsibility is on the caller to ensure the chunk is encoded correctly + unsafe { + self.inner + .store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner())?; + } + Ok(()) + } } impl From> for PyArray { diff --git a/src/array_bytes.rs b/src/array_bytes.rs index feaedf5..d8f475f 100644 --- a/src/array_bytes.rs +++ b/src/array_bytes.rs @@ -14,7 +14,8 @@ use pyo3_bytes::PyBytes; use zarrs::array::{ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength}; /// Bytes for a chunk as they cross the Python boundary. -#[pyclass(module = "zarrista", frozen, name = "ArrayBytes")] +#[derive(Clone)] +#[pyclass(module = "zarrista", frozen, name = "ArrayBytes", from_py_object)] pub struct PyArrayBytes(ArrayBytesOwned); #[pymethods] @@ -85,16 +86,32 @@ impl PyArrayBytes { } } +impl From for ArrayBytesOwned { + fn from(py_bytes: PyArrayBytes) -> Self { + py_bytes.0 + } +} + +impl From for PyArrayBytes { + fn from(bytes: ArrayBytesOwned) -> Self { + Self(bytes) + } +} + /// The owned representation, mirroring zarrs' [`ArrayBytes`] sum type. /// /// - Element buffers (fixed/variable payloads and the optional mask) are stored /// as [`PyBytes`] and borrowed zero-copy. /// - Offsets are copied into a `Vec` (they need a per-element cast from /// Python ints and are tiny metadata relative to the payload). -enum ArrayBytesOwned { +#[derive(Clone)] +pub enum ArrayBytesOwned { Fixed(PyBytes), Variable { bytes: PyBytes, + // Note: Cloning actually memcpy's the offsets; to avoid that would require Arc<[usize]>, + // but then it's not zero-copy to create from a Vec + // This is currently mostly hit on the async store_chunk path offsets: Vec, }, Optional {