Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions python/zarrista/_array.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Buffer
from types import EllipsisType
from typing import TypeAlias, Unpack

Expand All @@ -10,6 +11,7 @@ from zarrista.codec import (
CodecOptions,
)

from ._array_bytes import ArrayBytes
from ._chunks import ChunkGrid
from ._decoded_array import DecodedArray
from ._dtype import DataType
Expand All @@ -26,7 +28,7 @@ indexing are not supported.
"""

class Array:
"""A read-only Zarr array."""
"""A Zarr array."""

@staticmethod
def open(store: FilesystemStore | MemoryStore, path: str = "/") -> Array:
Expand Down Expand Up @@ -82,6 +84,51 @@ class Array:

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
def store_chunk(
self,
chunk_indices: list[int],
decoded_chunk: ArrayBytes,
**codec_options: Unpack[CodecOptions],
) -> None:
"""Encode `decoded_chunk` and write it as the chunk at `chunk_indices`.

`decoded_chunk` holds the decoded chunk data; the array's codec pipeline
encodes it before it is written. If the data equals the fill value and
`store_empty_chunks` is `False`, the chunk is erased instead.

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
def store_encoded_chunk(
self,
chunk_indices: list[int],
encoded_chunk: Buffer,
) -> None:
"""Write already-encoded bytes directly as the chunk at `chunk_indices`.

The bytes are stored verbatim with no encoding. The caller is
responsible for ensuring they match the array's codec pipeline; invalid
bytes produce a chunk that cannot be decoded.
"""
def compact_chunk(
self,
chunk_indices: list[int],
**codec_options: Unpack[CodecOptions],
) -> bool:
"""Re-encode the stored chunk in place, returning whether it was rewritten.

Reads the encoded chunk, attempts to produce a more compact encoding,
and rewrites it if that succeeds. Returns `True` if the chunk was
rewritten, `False` if it was absent or already optimal.

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
def erase_chunk(self, chunk_indices: list[int]) -> None:
"""Delete the chunk at `chunk_indices` from the store.

Erasing an absent chunk is a no-op.
"""
def erase_metadata(self) -> None:
"""Delete the array's metadata from the store."""
@property
def shape(self) -> list[int]:
"""The array shape."""
Expand All @@ -92,7 +139,7 @@ class Array:
"""

class AsyncArray:
"""A read-only Zarr array backed by an async store."""
"""A Zarr array backed by an async store."""

@staticmethod
async def open_async(store: AsyncStore, path: str = "/") -> AsyncArray:
Expand Down Expand Up @@ -151,6 +198,51 @@ class AsyncArray:

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
async def store_chunk(
self,
chunk_indices: list[int],
decoded_chunk: ArrayBytes,
**codec_options: Unpack[CodecOptions],
) -> None:
"""Encode `decoded_chunk` and write it as the chunk at `chunk_indices`.

`decoded_chunk` holds the decoded chunk data; the array's codec pipeline
encodes it before it is written. If the data equals the fill value and
`store_empty_chunks` is `False`, the chunk is erased instead.

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
async def store_encoded_chunk(
self,
chunk_indices: list[int],
encoded_chunk: Buffer,
) -> None:
"""Write already-encoded bytes directly as the chunk at `chunk_indices`.

The bytes are stored verbatim with no encoding. The caller is
responsible for ensuring they match the array's codec pipeline; invalid
bytes produce a chunk that cannot be decoded.
"""
async def compact_chunk(
self,
chunk_indices: list[int],
**codec_options: Unpack[CodecOptions],
) -> bool:
"""Re-encode the stored chunk in place, returning whether it was rewritten.

Reads the encoded chunk, attempts to produce a more compact encoding,
and rewrites it if that succeeds. Returns `True` if the chunk was
rewritten, `False` if it was absent or already optimal.

Keyword arguments are passed as [`CodecOptions`][zarrista.codec.CodecOptions].
"""
async def erase_chunk(self, chunk_indices: list[int]) -> None:
"""Delete the chunk at `chunk_indices` from the store.

Erasing an absent chunk is a no-op.
"""
async def erase_metadata(self) -> None:
"""Delete the array's metadata from the store."""
@property
def shape(self) -> list[int]:
"""The array shape."""
Expand Down
94 changes: 94 additions & 0 deletions src/array/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use crate::array::selection::PySelection;
use crate::array::shared::array_metadata_accessors;
use crate::array::PyChunkIndices;
use crate::array_bytes::PyArrayBytes;
use crate::codec::PyCodecOptions;
use crate::decoded_array::DecodedArray;
use crate::error::ZarristaError;
Expand Down Expand Up @@ -75,6 +76,53 @@ impl PyAsyncArray {
})
}

#[pyo3(signature = (chunk_indices, **codec_options))]
fn compact_chunk<'py>(
&self,
py: Python<'py>,
chunk_indices: PyChunkIndices,
codec_options: Option<PyCodecOptions>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let codec_options = codec_options
.map(|opts| opts.into_inner())
.unwrap_or_default();

future_into_py(py, async move {
let result = inner
.async_compact_chunk(chunk_indices.as_ref(), &codec_options)
.await
.map_err(ZarristaError::from)?;
Ok(result)
})
}

fn erase_chunk<'py>(
&self,
py: Python<'py>,
chunk_indices: PyChunkIndices,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.async_erase_chunk(chunk_indices.as_ref())
.await
.map_err(ZarristaError::from)?;
Ok(())
})
}

fn erase_metadata<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
.async_erase_metadata()
.await
.map_err(ZarristaError::from)?;
Ok(())
})
}

/// Read a region of the array as `Data`, using numpy-style basic indexing.
fn retrieve_array_subset<'py>(
&self,
Expand Down Expand Up @@ -128,6 +176,52 @@ impl PyAsyncArray {
Ok(encoded.map(PyBytes::new))
})
}

#[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))]
fn store_chunk<'py>(
&self,
py: Python<'py>,
chunk_indices: PyChunkIndices,
decoded_chunk: PyArrayBytes,
codec_options: Option<PyCodecOptions>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let codec_options = codec_options
.map(|opts| opts.into_inner())
.unwrap_or_default();

future_into_py(py, async move {
inner
.async_store_chunk_opt(
chunk_indices.as_ref(),
decoded_chunk.as_array_bytes()?,
&codec_options,
)
.await
.map_err(ZarristaError::from)?;
Ok(())
})
}

fn store_encoded_chunk<'py>(
&self,
py: Python<'py>,
chunk_indices: PyChunkIndices,
encoded_chunk: PyBytes,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
// Safety:
// The responsibility is on the caller to ensure the chunk is encoded correctly
unsafe {
inner
.async_store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner())
.await
.map_err(ZarristaError::from)?;
}
Ok(())
})
}
}

impl From<Array<dyn AsyncReadableWritableListableStorageTraits>> for PyAsyncArray {
Expand Down
57 changes: 57 additions & 0 deletions src/array/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use crate::array::selection::PySelection;
use crate::array::shared::array_metadata_accessors;
use crate::array::PyChunkIndices;
use crate::array_bytes::PyArrayBytes;
use crate::codec::PyCodecOptions;
use crate::decoded_array::DecodedArray;
use crate::error::ZarristaResult;
Expand Down Expand Up @@ -60,6 +61,30 @@ impl PyArray {
Ok(Self::new(Arc::new(inner)))
}

#[pyo3(signature = (chunk_indices, **codec_options))]
fn compact_chunk(
&self,
chunk_indices: PyChunkIndices,
codec_options: Option<PyCodecOptions>,
) -> ZarristaResult<bool> {
let codec_options = codec_options
.map(|opts| opts.into_inner())
.unwrap_or_default();
Ok(self
.inner
.compact_chunk(chunk_indices.as_ref(), &codec_options)?)
}

fn erase_chunk(&self, chunk_indices: PyChunkIndices) -> ZarristaResult<()> {
self.inner.erase_chunk(chunk_indices.as_ref())?;
Ok(())
}

fn erase_metadata(&self) -> ZarristaResult<()> {
self.inner.erase_metadata()?;
Ok(())
}

/// Read a region of the array, using numpy-style basic indexing.
///
/// Returns one of the decoded result classes (`Tensor`, `VariableArray`,
Expand Down Expand Up @@ -90,6 +115,38 @@ impl PyArray {
let encoded = self.inner.retrieve_encoded_chunk(chunk_indices.as_ref())?;
Ok(encoded.map(|buf| PyBytes::new(buf.into())))
}

#[pyo3(signature = (chunk_indices, decoded_chunk, **codec_options))]
fn store_chunk(
&self,
chunk_indices: PyChunkIndices,
decoded_chunk: &PyArrayBytes,
codec_options: Option<PyCodecOptions>,
) -> ZarristaResult<()> {
let codec_options = codec_options
.map(|opts| opts.into_inner())
.unwrap_or_default();
self.inner.store_chunk_opt(
chunk_indices.as_ref(),
decoded_chunk.as_array_bytes()?,
&codec_options,
)?;
Ok(())
}

fn store_encoded_chunk(
&self,
chunk_indices: PyChunkIndices,
encoded_chunk: PyBytes,
) -> ZarristaResult<()> {
// Safety:
// The responsibility is on the caller to ensure the chunk is encoded correctly
unsafe {
self.inner
.store_encoded_chunk(chunk_indices.as_ref(), encoded_chunk.into_inner())?;
}
Ok(())
}
}

impl From<Array<dyn ReadableWritableListableStorageTraits>> for PyArray {
Expand Down
21 changes: 19 additions & 2 deletions src/array_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use pyo3_bytes::PyBytes;
use zarrs::array::{ArrayBytes, ArrayBytesOffsets, ArrayBytesOptional, ArrayBytesVariableLength};

/// Bytes for a chunk as they cross the Python boundary.
#[pyclass(module = "zarrista", frozen, name = "ArrayBytes")]
#[derive(Clone)]
#[pyclass(module = "zarrista", frozen, name = "ArrayBytes", from_py_object)]
pub struct PyArrayBytes(ArrayBytesOwned);

#[pymethods]
Expand Down Expand Up @@ -85,16 +86,32 @@ impl PyArrayBytes {
}
}

impl From<PyArrayBytes> for ArrayBytesOwned {
fn from(py_bytes: PyArrayBytes) -> Self {
py_bytes.0
}
}

impl From<ArrayBytesOwned> for PyArrayBytes {
fn from(bytes: ArrayBytesOwned) -> Self {
Self(bytes)
}
}

/// The owned representation, mirroring zarrs' [`ArrayBytes`] sum type.
///
/// - Element buffers (fixed/variable payloads and the optional mask) are stored
/// as [`PyBytes`] and borrowed zero-copy.
/// - Offsets are copied into a `Vec<usize>` (they need a per-element cast from
/// Python ints and are tiny metadata relative to the payload).
enum ArrayBytesOwned {
#[derive(Clone)]
pub enum ArrayBytesOwned {
Fixed(PyBytes),
Variable {
bytes: PyBytes,
// Note: Cloning actually memcpy's the offsets; to avoid that would require Arc<[usize]>,
// but then it's not zero-copy to create from a Vec<usize>
// This is currently mostly hit on the async store_chunk path
offsets: Vec<usize>,
},
Optional {
Expand Down
Loading