Skip to content

v0.1.8

Latest

Choose a tag to compare

@0oshowero0 0oshowero0 released this 08 Jun 02:30
35bcf19

Highlight

🚀 New Features

  • Cross-Job Actor Discovery: Support cross-job actor discovery via an explicit namespace, improving usability in multi-job environments. (#115)
  • RDMA Support for openYuanrong: Enable openYuanrong RDMA support to leverage high-performance network transmission. (#108)

⚙️ Refactor

  • Unified Bootstrap Path: Introduce a dedicated bootstrap subfolder to isolate and organize the initialization codes for different storage backends, enhancing codebase scalability. (#103)
  • Zero-Copy Serialization Utilities: Provide general serialization tools for KV backends in serial_utils.py. The newly introduced batch_encode_into and batch_decode_from interfaces support zero-copy serialization, aligning with the RDMA transmission requirements of various backends. (#107)
# Example 1: batch_encode_into
def _put_bytes_thread_worker(self, batch_keys: list[str], batch_values: list[Any]) -> list[int]:
    """Worker thread for putting batch of non-tensors to MooncakeStore."""

    # TODO: switch to a pre-registered buffer from MooncakeStore once such an API is available.
    region_ptrs: list[int] = []
    region_sizes: list[int] = []

    def alloc(sizes: list[int]) -> list[Tensor]:
        nonlocal region_ptrs, region_sizes
        # `batch_packed_sizes` are byte counts. With torch.uint8 (1 byte/element),
        # a 1-D shape of (N,) corresponds to exactly N bytes. We use
        # `allocate_empty_tensors` to get N uint8 views over a single contiguous,
        # register-able region. These are plain byte buffers, not real tensors;
        # consumers apply the actual dtype/shape interpretation when unpacking.
        dtypes = [torch.uint8] * len(sizes)
        shapes = [(s,) for s in sizes]
        buffers, _, region_ptrs, region_sizes = allocate_empty_tensors(dtypes, shapes)
        return buffers

    buffers, batch_sizes = serial_utils.batch_encode_into(
        batch_values, alloc, num_workers=MAX_SERIAL_WORKER_THREADS
    )
    batch_ptrs = [cast(Tensor, b).data_ptr() for b in buffers]

    self._register_all_buffers(region_ptrs, region_sizes)
    try:
        self._batch_upsert_with_retry(batch_keys, batch_ptrs, batch_sizes)
    finally:
        self._unregister_all_buffers(region_ptrs)

    return batch_sizes

# Example 2: batch_decode_from
def _get_bytes_thread_worker(
    self, batch_keys: list[str], batch_packed_sizes: list[int], indexes: list[int]
) -> tuple[list[Any], list[int]]:
    # `batch_packed_sizes` are byte counts. With torch.uint8 (1 byte/element),
    # a 1-D shape of (N,) corresponds to exactly N bytes. We use
    # `allocate_empty_tensors` to get N uint8 views over a single contiguous,
    # register-able region. These are plain byte buffers, not real tensors;
    # consumers apply the actual dtype/shape interpretation when unpacking.
    batch_shapes = [(sz,) for sz in batch_packed_sizes]
    batch_dtypes = [torch.uint8] * len(batch_keys)
    batch_nbytes = get_nbytes(batch_dtypes, batch_shapes)
    batch_buffer_tensors, batch_buffer_ptrs, region_ptrs, region_sizes = allocate_empty_tensors(
        batch_dtypes, batch_shapes
    )

    self._register_all_buffers(region_ptrs, region_sizes)
    try:
        self._batch_get_into_with_retry(batch_keys, batch_buffer_ptrs, batch_nbytes)
    finally:
        self._unregister_all_buffers(region_ptrs)

    return serial_utils.batch_decode_from(batch_buffer_tensors), indexes

🐛 Fixes & Improvements

  • Concurrency Conflict Fix: Resolve concurrency conflicts between data status updates and other control operations in TransferQueueController. (#116)
  • Dedicated Notification Loop: Isolate notify_data_update ZMQ I/O into a dedicated background asyncio loop to prevent ACK timeouts in StorageManager. (#117)

What's Changed

  • [chore] Bump version from 0.1.7 to 0.1.8.dev0 by @0oshowero0 in #100
  • [recipe] Provide Relax style recipe by @Jixixi2020 in #93
  • [refactor] Register storage backend for greater scalability by 🎉@fy2462 in #103
  • [doc] Adjust yuanrong backend doc by @dpj135 in #104
  • [feat] Enable openYuanrong RDMA support by @KaisennHu in #108
  • [chore] Optimize config descriptions for better understanding by @0oshowero0 in #109
  • [refactor] Provide common serialization tools for KV backends to speed up tensor serial in nested values by 🎉@xupinjie in #107
  • [refactor] Use batch_encode_into/batch_decode_from for Yuanrong backend by @dpj135 in #110
  • [perf] Enable multi-thread serial for non-tensor values in MooncakeStore backend by @0oshowero0 in #111
  • [chore] Relax numpy version constraints by @0oshowero0 in #113
  • [feat] Support cross-job actor discovery via explicit namespace by 🎉@huniu20 in #115
  • [fix,refactor] Merge update_data_status thread/socket into request_handle to eliminate concurrency conflicts by @dodatboii in #116
  • [fix,refactor] Isolate notify_data_update ZMQ I/O into a dedicated background asyncio loop by @dodatboii in #117
  • [chore] Update README & bump version from 0.1.8.dev0 to 0.1.8 by @0oshowero0 in #118

New Contributors

Full Changelog: v0.1.7...v0.1.8