Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ff9f5e4
c++ side changes
rayhhome Jun 10, 2026
5c3780a
cython layer changes
rayhhome Jun 10, 2026
00b8495
Merge branch 'master' into callback-core-wiring
rayhhome Jun 11, 2026
eb668f8
Address reviews
rayhhome Jun 11, 2026
2ec8d81
Merge branch 'master' into callback-core-wiring
rayhhome Jun 11, 2026
711d5a3
Fix ObjectRef paramemter
rayhhome Jun 11, 2026
06a657d
Set in_core_worker to avoid void decrement of objectref refcount
rayhhome Jun 11, 2026
c534154
AddObjectOutOfScopeOrFreedCallback made public
rayhhome Jun 11, 2026
a7662dc
Merge branch 'master' into callback-core-wiring
rayhhome Jun 11, 2026
5e624fe
Add test to bazel file
rayhhome Jun 11, 2026
c365b4f
Merge branch 'master' into callback-core-wiring
rayhhome Jun 11, 2026
8f422f7
Fix test
rayhhome Jun 11, 2026
b673e05
Merge branch 'callback-core-wiring' of github.com:rayhhome/ray into c…
rayhhome Jun 11, 2026
d325891
Merge branch 'master' into callback-core-wiring
rayhhome Jun 11, 2026
33ed991
Address Shutdown edge case
rayhhome Jun 12, 2026
f830175
Merge branch 'callback-core-wiring' of github.com:rayhhome/ray into c…
rayhhome Jun 12, 2026
1adeed4
Address reviews
rayhhome Jun 15, 2026
71e5ee4
Merge branch 'master' of github.com:ray-project/ray into callback-cor…
rayhhome Jun 15, 2026
738dcaa
Address comments again
rayhhome Jun 15, 2026
6191d3a
Merge branch 'master' into callback-core-wiring
rayhhome Jun 15, 2026
2f43a3e
Address reviews
rayhhome Jun 17, 2026
5e6d39e
Merge branch 'master' into callback-core-wiring
rayhhome Jun 17, 2026
3fa9b00
Address comments and ameliorate codebase
rayhhome Jun 18, 2026
78d4a23
Add callback latency measuring tests
rayhhome Jun 18, 2026
ebecd5a
Merge branch 'master' into callback-core-wiring
rayhhome Jun 18, 2026
a37a262
Put ref registration assertion in test scoped
rayhhome Jun 22, 2026
96f38b7
Merge branch 'master' into callback-core-wiring
rayhhome Jun 22, 2026
0280347
Fix ray_perf.py unfired callback
rayhhome Jun 23, 2026
7903d34
Scaled-up callback throughput benchmark
rayhhome Jun 25, 2026
8b22ae9
Bump up thread count due to new cleanup thread
rayhhome Jun 25, 2026
06d491f
Merge branch 'master' of github.com:ray-project/ray into callback-cor…
rayhhome Jun 25, 2026
16eca1f
Remove 20 from thread count since we net increased thread count
rayhhome Jun 25, 2026
79fb62f
Merge branch 'master' into callback-core-wiring
rayhhome Jun 25, 2026
13f4e46
Integrate more realistic benchmark
rayhhome Jun 26, 2026
eafbdef
Merge branch 'master' into callback-core-wiring
rayhhome Jun 26, 2026
72ae1b3
Add BlockRefCounter Implementation and Tests
rayhhome Jun 17, 2026
8012534
Address comments and update tests
rayhhome Jun 22, 2026
8c8b649
Make on_block_produced idempotent
rayhhome Jun 22, 2026
f9812f0
Address comments
rayhhome Jun 25, 2026
e759d70
Pyrefly fixes + move benchmark
rayhhome Jun 25, 2026
0b2bb38
Fix pyrefly again
rayhhome Jun 25, 2026
98a3552
Address edge case of unowned blocks in SplitCoordinator
rayhhome Jun 25, 2026
46fadc6
wrap block_ref in counter actor
rayhhome Jun 26, 2026
30aeb9f
Comments improvement
rayhhome Jun 26, 2026
1547a61
Wire blockRefCounter through operators
rayhhome Jun 17, 2026
a677cca
Add missing type notations + missing hash shuffle change
rayhhome Jun 18, 2026
44d4080
Address comments + Make BlockRefCounter mandatory in call chains
rayhhome Jun 22, 2026
e601d8a
Track blocks for limit, zip, and output splitter
rayhhome Jun 22, 2026
f4fef45
Track blocks for aggregate num rows
rayhhome Jun 23, 2026
98ef994
Track blocks for shuffle reduce
rayhhome Jun 23, 2026
c5a12d9
simplify shuffle reduce memory tracking logic
rayhhome Jun 23, 2026
efe4579
Add argument for blockRefCounter
rayhhome Jun 25, 2026
40f1153
Add block_ref_counter argument to build_streaming_topology
rayhhome Jun 25, 2026
57a15c3
Remove duplicate start call mock_all_to_all_op
rayhhome Jun 25, 2026
6350a1e
Fix pyrefly
rayhhome Jun 26, 2026
60bf01f
Address missed start calls
rayhhome Jun 26, 2026
9650f83
Address start argument changes
rayhhome Jun 26, 2026
2b185dd
Adjust resource manager object store memory tracking logic
rayhhome Jun 17, 2026
7647790
Address reviews + remove implementation detail descriptions from test…
rayhhome Jun 23, 2026
ed666df
Also address resource manager unit test by switching to StubBlockRefC…
rayhhome Jun 23, 2026
285c850
Remove dead code
rayhhome Jun 25, 2026
2edfb95
Fix StubBlockRefCounter argument
rayhhome Jun 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2866,6 +2866,37 @@ cdef class GcsClient:
ray._private.utils._CALLED_FREQ[name] += 1
return getattr(self.inner, name)

cdef void _invoke_object_out_of_scope_callback(
const CObjectID &c_object_id, void *user_callback) noexcept nogil:
"""Invoked on the object_freed_callback_service_ thread when an object goes
out of scope. Calls the registered Python callback with the object ID as
``bytes``, then releases the Py_INCREF taken at registration.

Args:
c_object_id: The C++ ObjectID of the object that went out of scope.
user_callback: The Python callable registered by the caller, kept
alive by the Py_INCREF in ``add_object_out_of_scope_callback``.
"""
with gil:
try:
callback = <object>user_callback
id_binary = c_object_id.Binary()
callback(id_binary)
except BaseException:
# Invoked from C++ through a C function pointer, so a propagating
# exception would be undefined behavior; that is why we catch
# everything here, including KeyboardInterrupt/SystemExit.
logger.exception(
"Exception in the callback registered via "
"CoreWorker.add_object_out_of_scope_callback for object %s. The "
"callback must be non-blocking and exception-free, so check it "
"for I/O, blocking calls, or bugs that raise.",
c_object_id.Hex().decode("ascii"),
)
finally:
cpython.Py_DECREF(<object>user_callback)


cdef class CoreWorker:

def __cinit__(self, worker_type, store_socket, raylet_socket,
Expand Down Expand Up @@ -4219,6 +4250,56 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
c_object_id)

def add_object_out_of_scope_callback(
self, ObjectRef object_ref, callback: Callable[[bytes], None]):
"""Register a Python callable to fire when object_ref goes out of scope.

.. warning::
This is an internal Ray API. Do not use it outside of Ray libraries.

Can only be called on the worker that owns object_ref. Raises
ValueError if object_ref is not owned by this worker.

The callback runs on a dedicated background thread concurrent with the
main Python thread. It must be thread-safe; use a lock if it ever accesses
state shared with the main thread.

.. warning::
The callback runs on a single thread shared by every out-of-scope
notification for this worker, so it MUST be O(1) and non-blocking.
Anything that blocks here serializes every subsequent callback on
this worker. Please do not register any hanging/failing operations
here.

If the callback raises, the exception is logged and swallowed so that
subsequent callbacks are not affected.

Args:
object_ref: The owned object to watch.
callback: Called with the object ID as ``bytes`` when the last
reference is released.

Returns:
True if registered; False if the object is already out of scope
(the callback will never fire).
"""
if not callable(callback):
raise TypeError(
f"callback must be callable, got {type(callback).__name__!r}"
)
cdef CObjectID c_object_id = object_ref.native()
check_status(CCoreWorkerProcess.GetCoreWorker().CheckObjectOwnedByUs(
c_object_id))
cpython.Py_INCREF(callback)
registered = CCoreWorkerProcess.GetCoreWorker() \
.AddObjectOutOfScopeOrFreedCallback(
c_object_id,
_invoke_object_out_of_scope_callback,
<void *>callback)
if not registered:
cpython.Py_DECREF(callback)
return registered

def get_owner_address(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
Expand Down
14 changes: 14 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,20 @@ py_test(
],
)

py_test(
name = "test_block_ref_counter",
size = "small",
srcs = ["tests/test_block_ref_counter.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_map_operator",
size = "medium",
Expand Down
86 changes: 86 additions & 0 deletions python/ray/data/_internal/execution/block_ref_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import threading
from collections import defaultdict
from typing import Callable, Dict, Optional

import ray
from ray._private.worker import global_worker


class BlockRefCounter:
"""Tracks object-store memory usage per operator via Ray Core callbacks.

The callback fires when:
- All Python ObjectRefs wrapping the block's ObjectID are garbage-collected, AND
- All Ray tasks that received the block as an argument have completed.
"""

def __init__(
self,
add_object_out_of_scope_callback: Optional[
Callable[["ray.ObjectRef", Callable[[bytes], None]], bool]
] = None,
):
if add_object_out_of_scope_callback is None:
add_object_out_of_scope_callback = (
global_worker.core_worker.add_object_out_of_scope_callback # pyrefly: ignore[missing-attribute]
)
self._add_callback_fn = add_object_out_of_scope_callback
# IDs of live blocks. Stale callbacks (fired after clear()) check
# membership here and no-op, preventing negative _bytes_by_producer.
self._registered_ids: set[bytes] = set()
# (producer_id -> total live bytes); maintained incrementally for O(1) reads.
self._bytes_by_producer: Dict[str, int] = defaultdict(int)
self._lock = threading.Lock()

def on_block_produced(
self,
block_ref: "ray.ObjectRef",
size_bytes: int,
producer_id: str,
) -> None:
"""Register a block and attribute its memory to producer_id.

Registers a Ray Core out-of-scope callback so that when all references
to block_ref are gone the bytes are automatically removed from the
producer's usage.

Idempotent: calling twice with the same block_ref is a no-op.
"""
id_binary = block_ref.binary()
with self._lock:
if id_binary in self._registered_ids:
return
self._registered_ids.add(id_binary)
self._bytes_by_producer[producer_id] += size_bytes
Comment thread
rayhhome marked this conversation as resolved.

def _on_object_freed(id_bytes: bytes) -> None:
with self._lock:
if id_bytes not in self._registered_ids:
# Already cleared (e.g. by clear()), nothing to do.
return
self._registered_ids.discard(id_bytes)
self._bytes_by_producer[producer_id] -= size_bytes

try:
registered = self._add_callback_fn(block_ref, _on_object_freed)
except ValueError:
# Block not owned by this worker; can't track it.
_on_object_freed(id_binary)
return
if not registered:
_on_object_freed(id_binary)
Comment thread
rayhhome marked this conversation as resolved.

def get_object_store_memory_usage(self, producer_id: str) -> int:
"""Total bytes of live blocks attributed to producer_id."""
with self._lock:
return self._bytes_by_producer.get(producer_id, 0)

def clear(self) -> None:
"""Reset all accounting, e.g. on executor shutdown.

Any previously registered Ray Core callbacks firing after clear()
will be silently ignored because _registered_ids is empty.
"""
with self._lock:
self._registered_ids.clear()
self._bytes_by_producer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ActorPoolInfo,
AutoscalingActorPool,
)
from ray.data._internal.execution.block_ref_counter import BlockRefCounter
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand All @@ -37,7 +38,6 @@

if TYPE_CHECKING:

from ray.data._internal.execution.streaming_executor_state import OpState
from ray.data.block import BlockMetadataWithSchema

logger = logging.getLogger(__name__)
Expand All @@ -52,23 +52,6 @@
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]


@dataclass(frozen=True)
class ObjectStoreUsage:
"""Per-op object store accounting.

Attributes:
internal: Bytes held by this op's currently-running tasks
(outputs not yet yielded to the object store).
outputs: Bytes this op has produced that are still live in
the object store — its internal output queue, its
``OpState`` external output queue, and the downstream
eligible ops' inputs.
"""

internal: int
outputs: int


class OpTask(ABC):
"""Abstract class that represents a task that is created by an PhysicalOperator.

Expand Down Expand Up @@ -134,6 +117,8 @@ def __init__(
self,
task_index: int,
streaming_gen: ObjectRefGenerator,
block_ref_counter: BlockRefCounter,
producer_id: str,
output_ready_callback: Callable[[RefBundle], None] = lambda bundle: None,
task_done_callback: TaskDoneCallbackType = lambda exc, worker_stats, driver_stats: None,
block_ready_callback: Callable[
Expand All @@ -149,6 +134,9 @@ def __init__(
Args:
task_index: Index of the task. Used for callbacks.
streaming_gen: The streaming generator of this task. It should yield blocks.
block_ref_counter: The centralized block reference counter. on_block_produced
is called for each block yielded by this task.
producer_id: The id of the operator that produces the blocks from this task.
output_ready_callback: The callback to call when a new RefBundle is output
from the generator.
task_done_callback: The callback to call when the task is done.
Expand All @@ -171,6 +159,8 @@ def __init__(
self._block_ready_callback = block_ready_callback
self._metadata_ready_callback = metadata_ready_callback
self._operator_name = operator_name
self._block_ref_counter: BlockRefCounter = block_ref_counter
self._producer_id: str = producer_id

# If the generator hasn't produced block metadata yet, or if the block metadata
# object isn't available after we get a reference, we need store the pending
Expand Down Expand Up @@ -292,6 +282,9 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
meta_with_schema_bytes
)
meta = meta_with_schema.metadata
self._block_ref_counter.on_block_produced(
self._pending_block_ref, meta.size_bytes or 0, self._producer_id
)
self._output_ready_callback(
RefBundle(
[BlockEntry(self._pending_block_ref, meta)],
Expand Down Expand Up @@ -743,12 +736,19 @@ def num_output_splits(self) -> int:
"""
return self._num_output_splits

def start(self, options: ExecutionOptions) -> None:
def start(
self,
options: ExecutionOptions,
block_ref_counter: BlockRefCounter,
) -> None:
"""Called by the executor when execution starts for an operator.

Args:
options: The global options used for the overall execution.
block_ref_counter: The executor-wide shared counter for tracking
object-store memory.
"""
self._block_ref_counter = block_ref_counter
self._started = True

def can_add_input(self) -> bool:
Expand Down Expand Up @@ -886,41 +886,6 @@ def current_logical_usage(self) -> ExecutionResources:
"""
return ExecutionResources.zero()

def estimate_object_store_usage(self, state: "OpState") -> ObjectStoreUsage:
"""Returns the bytes this operator contributes to the global object
store budget. Subclasses may override this when their object store
footprint doesn't match the generic model.
"""
# Operator's internal Object Store usage
mem_op_internal = self.metrics.obj_store_mem_pending_task_outputs or 0

# Operator's outputs' Object Store usage
op_outputs_bytes = (
# Internal output queue
self.metrics.obj_store_mem_internal_outqueue
+
# External output queue
state.output_queue_bytes()
)

# TODO fix ineligible ops: this needs to include usage of all of OS
# for ineligible ops
#
# Outputs of this operator used downstream
used_op_outputs_bytes = sum(
(
downstream_op.metrics.obj_store_mem_internal_inqueue_for_input(
downstream_op.input_dependencies.index(self)
)
+ downstream_op.metrics.obj_store_mem_pending_task_inputs
)
for downstream_op in self.output_dependencies
)
return ObjectStoreUsage(
internal=int(mem_op_internal),
outputs=int(op_outputs_bytes + used_op_outputs_bytes),
)

def running_logical_usage(self) -> ExecutionResources:
"""Returns the estimated running CPU, GPU, and memory usage of this operator,
excluding object store memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

if TYPE_CHECKING:
import pyarrow as pa

from ray.data._internal.execution.block_ref_counter import BlockRefCounter
import ray
from ray.actor import ActorHandle
from ray.core.generated import gcs_pb2
Expand Down Expand Up @@ -265,9 +267,13 @@ def _apply_default_actor_task_remote_args(

return ray_actor_task_remote_args

def start(self, options: ExecutionOptions):
def start(
self,
options: ExecutionOptions,
block_ref_counter: "BlockRefCounter",
):
self._actor_locality_enabled = options.actor_locality_enabled
super().start(options)
super().start(options, block_ref_counter)

self._actor_cls = ray.remote(**self._ray_remote_args)(self._map_worker_cls)
self._actor_pool.scale(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def _get_next_inner(self) -> RefBundle:
[BlockEntry(block_ref, metadata)], owns_blocks=True, schema=schema
)

self._block_ref_counter.on_block_produced(
block_ref, metadata.size_bytes or 0, self.id
)
self._has_outputted = True
return bundle

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,17 @@ def all_inputs_done(self) -> None:
)
# NOTE: We don't account object store memory use from intermediate `bulk_fn`
# outputs (e.g., map outputs for map-reduce).
output_buffer, self._stats = self._bulk_fn(self._input_buffer.to_list(), ctx)

input_bundles = self._input_buffer.to_list()
output_buffer, self._stats = self._bulk_fn(input_bundles, ctx)
self._output_buffer = FIFOBundleQueue(output_buffer)

for bundle in output_buffer:
for entry in bundle.blocks:
self._block_ref_counter.on_block_produced(
entry.ref, entry.metadata.size_bytes or 0, self.id
)

while self._input_buffer.has_next():
refs = self._input_buffer.get_next()
self._metrics.on_input_dequeued(refs, input_index=0)
Expand Down
Loading
Loading