Skip to content

Parallel b-tree reading #216

@bnlawrence

Description

@bnlawrence

Parallel B-Tree Reading via cat_ranges

Much usage of pyfive will be for remote access to data where the data is presented to pyfive
vai fsspec, which itself supports a significant set of benefits, the most of important of
which is that if the requests are provided together via cat_ranges, rather than a series of requests,
each with latency, the requests are generated concurrently. Tests with chunk reading show this
can improve performance by a factor of 10 on remote http and s3 resources.
Some benefit ought to be possible for b-trees as well.

Background and Motivation

HDF5 B-tree traversal is currently sequential: each node is read via fh.seek() +
fh.read(), and child addresses are only known after the parent node has been parsed.
For remote files this means O(N nodes) serial round-trips.

The existing MetadataBufferingWrapper papers over this by eagerly reading the first
1 MiB on first access, which covers B-tree nodes for well-packed files. For
poorly-packed files (nodes scattered beyond 1 MiB) the fallback is individual remote
reads — exactly the pattern that cat_ranges is designed to replace.

The chunk data path already uses cat_ranges via _read_bulk_fsspec. This change
applies the same technique to B-tree node reads.


Key Insight: What Can Be Parallelised

B-tree traversal has a sequential dependency between levels: child addresses are
only discovered by parsing the parent. This cannot be avoided. However, once all
addresses at a given level are known, every read at that level is independent.

Level N (root):   sequential read          ← 1 read, unavoidable
Level N-1:        cat_ranges               ← all addresses known after root
...
Level 0 (leaves): cat_ranges               ← all addresses known after level 1

In practice HDF5 v1 B-trees are shallow (2–3 levels), so the internal nodes are few
and cheap. The leaves dominate, and that is where the win is largest.

The target optimisation: traverse internal nodes sequentially (cheap, few reads),
collect all leaf addresses, then fetch all leaf nodes in a single cat_ranges call.


Scope

This change applies to BTreeV1RawDataChunks only — this is the B-tree used for
chunked raw data and is the hot path for array reads. BTreeV1Groups and the v2
B-trees are lower priority and should not be changed in this pass.


Implementation

1. Add a _supports_cat_ranges helper on AbstractBTree

@staticmethod
def _get_cat_ranges_fs(fh):
    """
    Return (fs, path) if fh supports cat_ranges, else (None, None).
    Reaches through MetadataBufferingWrapper the same way _read_bulk_fsspec does.
    """
    actual_fh = getattr(fh, "fh", fh)
    fs = getattr(actual_fh, "fs", None)
    path = getattr(actual_fh, "path", None)
    if fs is not None and path is not None and hasattr(fs, "cat_ranges"):
        return fs, path
    return None, None

2. Split BTreeV1RawDataChunks.__init__ traversal into two phases

The existing _read_children walks the tree level-by-level sequentially. Replace it
in BTreeV1RawDataChunks with a version that:

  • Phase 1: reads internal nodes (level > 0) sequentially as today
  • Phase 2: collects all leaf addresses from the parsed internal nodes, then fetches
    all leaf nodes in one cat_ranges call
def _read_children(self):
    """Override to parallelise leaf reads via cat_ranges when available."""
    fs, path = self._get_cat_ranges_fs(self.fh)

    if fs is None:
        # No cat_ranges support — fall back to original sequential traversal
        super()._read_children()
        return

    # Phase 1: traverse internal nodes sequentially (level > 0)
    # These are few in number; sequential reads are fine here, and we
    # cannot avoid the dependency chain between levels.
    for node_level in range(self.depth, 0, -1):
        for parent_node in self.all_nodes[node_level]:
            for child_addr in parent_node["addresses"]:
                if node_level - 1 > 0:
                    # Still an internal node — read sequentially as before
                    child_node = self._read_node(child_addr, node_level - 1)
                    self._add_node(child_node)
                # If node_level - 1 == 0 it's a leaf; collect below

    # Phase 2: collect all leaf addresses from the lowest internal level
    leaf_addresses = []
    lowest_internal = self.all_nodes.get(1, [])
    for node in lowest_internal:
        leaf_addresses.extend(node["addresses"])

    if not leaf_addresses:
        # Depth-0 tree (root is also leaf) — already read, nothing to do
        return

    # Phase 3: bulk-fetch all leaf nodes in one cat_ranges call
    leaf_size = self._estimate_leaf_node_size()
    starts = [addr for addr in leaf_addresses]
    stops  = [addr + leaf_size for addr in leaf_addresses]

    raw_leaves = fs.cat_ranges([path] * len(leaf_addresses), starts, stops)

    for addr, raw in zip(leaf_addresses, raw_leaves):
        node = self._parse_leaf_from_buffer(raw, addr)
        self._add_node(node)

3. Leaf node size estimation

HDF5 v1 B-tree node size is not stored in the node itself; it must be inferred.
The safest approach is to read the first leaf node sequentially to determine its
actual size, then use that for all subsequent reads.

def _estimate_leaf_node_size(self):
    """
    Return the byte size of a leaf node by reading the first one sequentially.

    HDF5 v1 B-tree nodes have no stored size field; the size is a function of
    the number of entries (entries_used in the header) and the per-entry record
    size (which depends on self.dims).  We read one node to get entries_used,
    compute the size, and assume all leaf nodes are the same size.

    Per-entry layout for NODE_TYPE=1 (raw data chunks):
        8 bytes  : chunk_size + filter_mask (two uint32)
        8 * dims : chunk_offset (dims uint64 values)
        8 bytes  : chunk_address (uint64)
    """
    # B_LINK_NODE header size: 4 + 1 + 1 + 2 + 8 + 8 = 24 bytes
    HEADER_SIZE = 24
    # Per-entry record size
    entry_size = 8 + 8 * self.dims + 8
    # N+1 keys are *not* present in NODE_TYPE=1 (unlike type 0 groups)

    # Peek at the first leaf node to get entries_used
    first_leaf_addr = self.all_nodes[1][0]["addresses"][0]
    self.fh.seek(first_leaf_addr)
    header_bytes = self.fh.read(HEADER_SIZE)
    entries_used = struct.unpack_from("<H", header_bytes, 6)[0]  # offset 6 in header

    return HEADER_SIZE + entries_used * entry_size

Note: If leaf nodes can have differing entries_used values (e.g. the last leaf
in a non-full tree), then using a fixed size will over-read the last few nodes.
Over-reading is harmless — _parse_leaf_from_buffer reads only entries_used
entries from the buffer — but wastes a small amount of bandwidth. An alternative
is to compute the size per-address by peeking all headers in a first cat_ranges
call (24 bytes each), then issuing a second cat_ranges for full node bodies.
For most files the single-size approach is sufficient.

4. Parse a leaf node from a raw buffer

Extract the existing sequential parse logic from _read_node into a buffer-based
variant so it can be called on the raw bytes returned by cat_ranges:

def _parse_leaf_from_buffer(self, raw, addr):
    """
    Parse a leaf node (node_level=0) from a raw bytes buffer.
    Mirrors the logic in _read_node / _read_node_header but reads from
    a BytesIO rather than seeking self.fh.
    """
    import io
    buf = io.BytesIO(raw)

    # --- header (mirrors _read_node_header) ---
    node = _unpack_struct_from(self.B_LINK_NODE, buf.read(struct.calcsize(
        "<" + "".join(self.B_LINK_NODE.values())
    )))
    assert node["signature"] == b"TREE"
    assert node["node_type"] == self.NODE_TYPE
    assert node["node_level"] == 0

    # --- entries (mirrors _read_node body for NODE_TYPE=1) ---
    keys = []
    addresses = []
    fmt = "<" + "Q" * self.dims
    fmt_size = struct.calcsize(fmt)

    for _ in range(node["entries_used"]):
        chunk_size, filter_mask = struct.unpack("<II", buf.read(8))
        chunk_offset = struct.unpack(fmt, buf.read(fmt_size))
        chunk_address = struct.unpack("<Q", buf.read(8))[0]
        keys.append(OrderedDict((
            ("chunk_size",   chunk_size),
            ("filter_mask",  filter_mask),
            ("chunk_offset", chunk_offset),
        )))
        addresses.append(chunk_address)

    node["keys"] = keys
    node["addresses"] = addresses
    self.last_offset = max(addr, self.last_offset)
    return node

Implementation note: _unpack_struct_from already exists in core.py and
accepts a buffer argument — use it directly rather than duplicating the unpacking
logic. Check its exact signature before wiring up.


Interaction with MetadataBufferingWrapper

cat_ranges is called on actual_fh.fs (the underlying fsspec filesystem), bypassing
MetadataBufferingWrapper entirely — exactly as _read_bulk_fsspec does for chunks.
This is correct behaviour:

  • Well-packed files: the 1 MiB eager buffer already holds the leaf nodes.
    cat_ranges will hit the fsspec cache layer and cost nothing extra.
  • Poorly-packed files: cat_ranges issues parallel remote fetches for all
    scattered leaf nodes in one round-trip, which is far better than the current
    sequential fallback reads.

No changes to MetadataBufferingWrapper are required.


Fallback Behaviour

_read_children checks for cat_ranges support before doing anything new. If the
filesystem does not support cat_ranges (local files, in-memory buffers, custom
wrappers) the call falls through to super()._read_children() — the existing
sequential traversal — unchanged.


Testing

  1. Unit test — well-packed file on S3 (mocked): mock fs.cat_ranges to return
    pre-built leaf node bytes; assert it is called once with all leaf addresses; assert
    the resulting self.all_nodes[0] matches the sequential parse.

  2. Unit test — no cat_ranges: pass a plain BytesIO fh; assert the sequential
    path is taken and results are identical.

  3. Regression: run the existing B-tree test suite unchanged — all results must
    match before and after.

  4. Integration: open a real chunked HDF5 file via s3fs; assert chunk index
    contents are identical between serial and parallel paths.


Files to Change

File Change
pyfive/btree.py Add _get_cat_ranges_fs, override _read_children and add _estimate_leaf_node_size / _parse_leaf_from_buffer to BTreeV1RawDataChunks
tests/test_btree_parallel.py New test file covering the cases above

No changes required to metadata_buffering_wrapper.py, chunk_read.py, or any caller
of BTreeV1RawDataChunks — the parallel path is entirely internal to the class.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions