diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1886a130963b..db416148cb70 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2866,6 +2866,37 @@ cdef class GcsClient: ray._private.utils._CALLED_FREQ[name] += 1 return getattr(self.inner, name) +cdef void _invoke_object_out_of_scope_callback( + const CObjectID &c_object_id, void *user_callback) noexcept nogil: + """Invoked on the object_freed_callback_service_ thread when an object goes + out of scope. Calls the registered Python callback with the object ID as + ``bytes``, then releases the Py_INCREF taken at registration. + + Args: + c_object_id: The C++ ObjectID of the object that went out of scope. + user_callback: The Python callable registered by the caller, kept + alive by the Py_INCREF in ``add_object_out_of_scope_callback``. + """ + with gil: + try: + callback = user_callback + id_binary = c_object_id.Binary() + callback(id_binary) + except BaseException: + # Invoked from C++ through a C function pointer, so a propagating + # exception would be undefined behavior; that is why we catch + # everything here, including KeyboardInterrupt/SystemExit. + logger.exception( + "Exception in the callback registered via " + "CoreWorker.add_object_out_of_scope_callback for object %s. The " + "callback must be non-blocking and exception-free, so check it " + "for I/O, blocking calls, or bugs that raise.", + c_object_id.Hex().decode("ascii"), + ) + finally: + cpython.Py_DECREF(user_callback) + + cdef class CoreWorker: def __cinit__(self, worker_type, store_socket, raylet_socket, @@ -4219,6 +4250,56 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference( c_object_id) + def add_object_out_of_scope_callback( + self, ObjectRef object_ref, callback: Callable[[bytes], None]): + """Register a Python callable to fire when object_ref goes out of scope. + + .. warning:: + This is an internal Ray API. Do not use it outside of Ray libraries. + + Can only be called on the worker that owns object_ref. Raises + ValueError if object_ref is not owned by this worker. + + The callback runs on a dedicated background thread concurrent with the + main Python thread. It must be thread-safe; use a lock if it ever accesses + state shared with the main thread. + + .. warning:: + The callback runs on a single thread shared by every out-of-scope + notification for this worker, so it MUST be O(1) and non-blocking. + Anything that blocks here serializes every subsequent callback on + this worker. Please do not register any hanging/failing operations + here. + + If the callback raises, the exception is logged and swallowed so that + subsequent callbacks are not affected. + + Args: + object_ref: The owned object to watch. + callback: Called with the object ID as ``bytes`` when the last + reference is released. + + Returns: + True if registered; False if the object is already out of scope + (the callback will never fire). + """ + if not callable(callback): + raise TypeError( + f"callback must be callable, got {type(callback).__name__!r}" + ) + cdef CObjectID c_object_id = object_ref.native() + check_status(CCoreWorkerProcess.GetCoreWorker().CheckObjectOwnedByUs( + c_object_id)) + cpython.Py_INCREF(callback) + registered = CCoreWorkerProcess.GetCoreWorker() \ + .AddObjectOutOfScopeOrFreedCallback( + c_object_id, + _invoke_object_out_of_scope_callback, + callback) + if not registered: + cpython.Py_DECREF(callback) + return registered + def get_owner_address(self, ObjectRef object_ref): cdef: CObjectID c_object_id = object_ref.native() diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index 960aa67d891f..5d859b890ec4 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -1426,6 +1426,20 @@ py_test( ], ) +py_test( + name = "test_block_ref_counter", + size = "small", + srcs = ["tests/test_block_ref_counter.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_map_operator", size = "medium", diff --git a/python/ray/data/_internal/execution/block_ref_counter.py b/python/ray/data/_internal/execution/block_ref_counter.py new file mode 100644 index 000000000000..7c73022e4877 --- /dev/null +++ b/python/ray/data/_internal/execution/block_ref_counter.py @@ -0,0 +1,86 @@ +import threading +from collections import defaultdict +from typing import Callable, Dict, Optional + +import ray +from ray._private.worker import global_worker + + +class BlockRefCounter: + """Tracks object-store memory usage per operator via Ray Core callbacks. + + The callback fires when: + - All Python ObjectRefs wrapping the block's ObjectID are garbage-collected, AND + - All Ray tasks that received the block as an argument have completed. + """ + + def __init__( + self, + add_object_out_of_scope_callback: Optional[ + Callable[["ray.ObjectRef", Callable[[bytes], None]], bool] + ] = None, + ): + if add_object_out_of_scope_callback is None: + add_object_out_of_scope_callback = ( + global_worker.core_worker.add_object_out_of_scope_callback # pyrefly: ignore[missing-attribute] + ) + self._add_callback_fn = add_object_out_of_scope_callback + # IDs of live blocks. Stale callbacks (fired after clear()) check + # membership here and no-op, preventing negative _bytes_by_producer. + self._registered_ids: set[bytes] = set() + # (producer_id -> total live bytes); maintained incrementally for O(1) reads. + self._bytes_by_producer: Dict[str, int] = defaultdict(int) + self._lock = threading.Lock() + + def on_block_produced( + self, + block_ref: "ray.ObjectRef", + size_bytes: int, + producer_id: str, + ) -> None: + """Register a block and attribute its memory to producer_id. + + Registers a Ray Core out-of-scope callback so that when all references + to block_ref are gone the bytes are automatically removed from the + producer's usage. + + Idempotent: calling twice with the same block_ref is a no-op. + """ + id_binary = block_ref.binary() + with self._lock: + if id_binary in self._registered_ids: + return + self._registered_ids.add(id_binary) + self._bytes_by_producer[producer_id] += size_bytes + + def _on_object_freed(id_bytes: bytes) -> None: + with self._lock: + if id_bytes not in self._registered_ids: + # Already cleared (e.g. by clear()), nothing to do. + return + self._registered_ids.discard(id_bytes) + self._bytes_by_producer[producer_id] -= size_bytes + + try: + registered = self._add_callback_fn(block_ref, _on_object_freed) + except ValueError: + # Block not owned by this worker; can't track it. + _on_object_freed(id_binary) + return + if not registered: + _on_object_freed(id_binary) + + def get_object_store_memory_usage(self, producer_id: str) -> int: + """Total bytes of live blocks attributed to producer_id.""" + with self._lock: + return self._bytes_by_producer.get(producer_id, 0) + + def clear(self) -> None: + """Reset all accounting, e.g. on executor shutdown. + + Any previously registered Ray Core callbacks firing after clear() + will be silently ignored because _registered_ids is empty. + """ + with self._lock: + self._registered_ids.clear() + self._bytes_by_producer.clear() diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 545458506a4f..792d905c8b1e 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -24,6 +24,7 @@ ActorPoolInfo, AutoscalingActorPool, ) +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, @@ -37,7 +38,6 @@ if TYPE_CHECKING: - from ray.data._internal.execution.streaming_executor_state import OpState from ray.data.block import BlockMetadataWithSchema logger = logging.getLogger(__name__) @@ -52,23 +52,6 @@ Waitable = Union[ray.ObjectRef, ObjectRefGenerator] -@dataclass(frozen=True) -class ObjectStoreUsage: - """Per-op object store accounting. - - Attributes: - internal: Bytes held by this op's currently-running tasks - (outputs not yet yielded to the object store). - outputs: Bytes this op has produced that are still live in - the object store — its internal output queue, its - ``OpState`` external output queue, and the downstream - eligible ops' inputs. - """ - - internal: int - outputs: int - - class OpTask(ABC): """Abstract class that represents a task that is created by an PhysicalOperator. @@ -134,6 +117,8 @@ def __init__( self, task_index: int, streaming_gen: ObjectRefGenerator, + block_ref_counter: BlockRefCounter, + producer_id: str, output_ready_callback: Callable[[RefBundle], None] = lambda bundle: None, task_done_callback: TaskDoneCallbackType = lambda exc, worker_stats, driver_stats: None, block_ready_callback: Callable[ @@ -149,6 +134,9 @@ def __init__( Args: task_index: Index of the task. Used for callbacks. streaming_gen: The streaming generator of this task. It should yield blocks. + block_ref_counter: The centralized block reference counter. on_block_produced + is called for each block yielded by this task. + producer_id: The id of the operator that produces the blocks from this task. output_ready_callback: The callback to call when a new RefBundle is output from the generator. task_done_callback: The callback to call when the task is done. @@ -171,6 +159,8 @@ def __init__( self._block_ready_callback = block_ready_callback self._metadata_ready_callback = metadata_ready_callback self._operator_name = operator_name + self._block_ref_counter: BlockRefCounter = block_ref_counter + self._producer_id: str = producer_id # If the generator hasn't produced block metadata yet, or if the block metadata # object isn't available after we get a reference, we need store the pending @@ -292,6 +282,9 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int: meta_with_schema_bytes ) meta = meta_with_schema.metadata + self._block_ref_counter.on_block_produced( + self._pending_block_ref, meta.size_bytes or 0, self._producer_id + ) self._output_ready_callback( RefBundle( [BlockEntry(self._pending_block_ref, meta)], @@ -743,12 +736,19 @@ def num_output_splits(self) -> int: """ return self._num_output_splits - def start(self, options: ExecutionOptions) -> None: + def start( + self, + options: ExecutionOptions, + block_ref_counter: BlockRefCounter, + ) -> None: """Called by the executor when execution starts for an operator. Args: options: The global options used for the overall execution. + block_ref_counter: The executor-wide shared counter for tracking + object-store memory. """ + self._block_ref_counter = block_ref_counter self._started = True def can_add_input(self) -> bool: @@ -886,41 +886,6 @@ def current_logical_usage(self) -> ExecutionResources: """ return ExecutionResources.zero() - def estimate_object_store_usage(self, state: "OpState") -> ObjectStoreUsage: - """Returns the bytes this operator contributes to the global object - store budget. Subclasses may override this when their object store - footprint doesn't match the generic model. - """ - # Operator's internal Object Store usage - mem_op_internal = self.metrics.obj_store_mem_pending_task_outputs or 0 - - # Operator's outputs' Object Store usage - op_outputs_bytes = ( - # Internal output queue - self.metrics.obj_store_mem_internal_outqueue - + - # External output queue - state.output_queue_bytes() - ) - - # TODO fix ineligible ops: this needs to include usage of all of OS - # for ineligible ops - # - # Outputs of this operator used downstream - used_op_outputs_bytes = sum( - ( - downstream_op.metrics.obj_store_mem_internal_inqueue_for_input( - downstream_op.input_dependencies.index(self) - ) - + downstream_op.metrics.obj_store_mem_pending_task_inputs - ) - for downstream_op in self.output_dependencies - ) - return ObjectStoreUsage( - internal=int(mem_op_internal), - outputs=int(op_outputs_bytes + used_op_outputs_bytes), - ) - def running_logical_usage(self) -> ExecutionResources: """Returns the estimated running CPU, GPU, and memory usage of this operator, excluding object store memory. diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 5fe2f29f443f..76afc11e4e1e 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -23,6 +23,8 @@ if TYPE_CHECKING: import pyarrow as pa + + from ray.data._internal.execution.block_ref_counter import BlockRefCounter import ray from ray.actor import ActorHandle from ray.core.generated import gcs_pb2 @@ -265,9 +267,13 @@ def _apply_default_actor_task_remote_args( return ray_actor_task_remote_args - def start(self, options: ExecutionOptions): + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ): self._actor_locality_enabled = options.actor_locality_enabled - super().start(options) + super().start(options, block_ref_counter) self._actor_cls = ray.remote(**self._ray_remote_args)(self._map_worker_cls) self._actor_pool.scale( diff --git a/python/ray/data/_internal/execution/operators/aggregate_num_rows.py b/python/ray/data/_internal/execution/operators/aggregate_num_rows.py index 1e127ec1fb96..92874da55da4 100644 --- a/python/ray/data/_internal/execution/operators/aggregate_num_rows.py +++ b/python/ray/data/_internal/execution/operators/aggregate_num_rows.py @@ -53,6 +53,9 @@ def _get_next_inner(self) -> RefBundle: [BlockEntry(block_ref, metadata)], owns_blocks=True, schema=schema ) + self._block_ref_counter.on_block_produced( + block_ref, metadata.size_bytes or 0, self.id + ) self._has_outputted = True return bundle diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index 388e2790608a..2bd382dce159 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -183,9 +183,17 @@ def all_inputs_done(self) -> None: ) # NOTE: We don't account object store memory use from intermediate `bulk_fn` # outputs (e.g., map outputs for map-reduce). - output_buffer, self._stats = self._bulk_fn(self._input_buffer.to_list(), ctx) + + input_bundles = self._input_buffer.to_list() + output_buffer, self._stats = self._bulk_fn(input_bundles, ctx) self._output_buffer = FIFOBundleQueue(output_buffer) + for bundle in output_buffer: + for entry in bundle.blocks: + self._block_ref_counter.on_block_produced( + entry.ref, entry.metadata.size_bytes or 0, self.id + ) + while self._input_buffer.has_next(): refs = self._input_buffer.get_next() self._metrics.on_input_dequeued(refs, input_index=0) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 36c1303e449a..1c7a6bbe54c3 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -80,6 +80,7 @@ ) if typing.TYPE_CHECKING: + from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.progress.base_progress import BaseProgressBar logger = logging.getLogger(__name__) @@ -673,8 +674,12 @@ def __init__( self._reduce_bar = None self._reduce_metrics = OpRuntimeMetrics(self) - def start(self, options: ExecutionOptions) -> None: - super().start(options) + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ) -> None: + super().start(options, block_ref_counter) @property def shuffle_name(self) -> str: @@ -1196,6 +1201,8 @@ def _on_aggregation_done( ExecutionResources.from_resource_dict(finalize_task_resource_bundle) ), operator_name=self.name, + block_ref_counter=self._block_ref_counter, + producer_id=self.id, ) self._finalizing_tasks[partition_id] = data_task diff --git a/python/ray/data/_internal/execution/operators/input_data_buffer.py b/python/ray/data/_internal/execution/operators/input_data_buffer.py index 66cdde25c81a..2d4c9350ad64 100644 --- a/python/ray/data/_internal/execution/operators/input_data_buffer.py +++ b/python/ray/data/_internal/execution/operators/input_data_buffer.py @@ -1,4 +1,7 @@ -from typing import Callable, List, Optional +from typing import TYPE_CHECKING, Callable, List, Optional + +if TYPE_CHECKING: + from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces import ( ExecutionOptions, @@ -45,7 +48,11 @@ def __init__( self._input_data_index = 0 self.mark_execution_finished() - def start(self, options: ExecutionOptions) -> None: + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ) -> None: if not self._is_input_initialized: self._input_data = self._input_data_factory( self.target_max_block_size_override @@ -57,7 +64,7 @@ def start(self, options: ExecutionOptions) -> None: # so we record input metrics here for bundle in self._input_data: self._metrics.on_input_received(bundle) - super().start(options) + super().start(options, block_ref_counter) def has_next(self) -> bool: return self._input_data_index < len(self._input_data) diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index 88dda94cf5fa..522800e557b4 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -81,6 +81,10 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]: ) out_blocks.append(block) metadata = ray.get(metadata_ref) + # Slicing creates a new block; register it for memory tracking. + self._block_ref_counter.on_block_produced( + block, metadata.size_bytes or 0, self.id + ) out_metadata.append(metadata) self._output_blocks_stats.append(metadata.to_stats()) self._consumed_rows = self._limit diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index a4cb7aca5ed3..5a72596d3af2 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -29,6 +29,8 @@ if TYPE_CHECKING: import pyarrow as pa + from ray.data._internal.execution.block_ref_counter import BlockRefCounter + import ray from ray import ObjectRef from ray._raylet import ObjectRefGenerator @@ -485,8 +487,12 @@ def create( else: raise ValueError(f"Unsupported execution strategy {compute_strategy}") - def start(self, options: "ExecutionOptions"): - super().start(options) + def start( + self, + options: "ExecutionOptions", + block_ref_counter: "BlockRefCounter", + ): + super().start(options, block_ref_counter) # Create output queue with desired ordering semantics. if options.preserve_order: self._output_queue = ReorderingBundleQueue() @@ -659,8 +665,12 @@ def _task_done_callback( data_task = DataOpTask( task_index, gen, - lambda output: _output_ready_callback(task_index, output), - functools.partial(_task_done_callback, task_index), + self._block_ref_counter, + self.id, + output_ready_callback=lambda output: _output_ready_callback( + task_index, output + ), + task_done_callback=functools.partial(_task_done_callback, task_index), operator_name=self.name, ) self._metrics.on_task_submitted( diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index f436179a77be..bf866dc841a8 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -2,7 +2,10 @@ import math import time from dataclasses import replace -from typing import Any, Collection, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple + +if TYPE_CHECKING: + from ray.data._internal.execution.block_ref_counter import BlockRefCounter from typing_extensions import override @@ -124,13 +127,17 @@ def num_output_rows_total(self) -> Optional[int]: # The total number of rows is the same as the number of input rows. return self.input_dependencies[0].num_output_rows_total() - def start(self, options: ExecutionOptions) -> None: + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ) -> None: if options.preserve_order: # If preserve_order is set, we need to ignore locality hints to ensure determinism. self._locality_hints = None self._max_buffer_size = 0 - super().start(options) + super().start(options, block_ref_counter) def throttling_disabled(self) -> bool: """Disables resource-based throttling. @@ -203,6 +210,11 @@ def all_inputs_done(self) -> None: for i, count in enumerate(allocation): bundles = self._split_from_buffer(count) for b in bundles: + # Splitting may create new blocks; register for memory tracking. + for entry in b.blocks: + self._block_ref_counter.on_block_produced( + entry.ref, entry.metadata.size_bytes or 0, self.id + ) b = replace(b, output_split_idx=i) self._output_queue.add(b) self._metrics.on_output_queued(b) diff --git a/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_map_operator.py b/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_map_operator.py index 2f3c7070b115..82daa608d057 100644 --- a/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_map_operator.py +++ b/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_map_operator.py @@ -18,7 +18,6 @@ ) from ray.data._internal.execution.interfaces.physical_operator import ( MetadataOpTask, - ObjectStoreUsage, OpTask, estimate_total_num_of_blocks, ) @@ -422,9 +421,6 @@ def current_logical_usage(self) -> ExecutionResources: memory=self._map_resource_usage.memory, ) - def estimate_object_store_usage(self, state) -> ObjectStoreUsage: - return ObjectStoreUsage(internal=0, outputs=0) - def incremental_resource_usage(self) -> ExecutionResources: avg_input = self._metrics.average_bytes_inputs_per_task memory = int(avg_input * SHUFFLE_PEAK_MEMORY_MULTIPLIER) if avg_input else 0 diff --git a/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_reduce_operator.py b/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_reduce_operator.py index 8159538c2a02..87b3db94063c 100644 --- a/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_reduce_operator.py +++ b/python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_reduce_operator.py @@ -164,6 +164,8 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: data_task = DataOpTask( task_index=partition_id, streaming_gen=block_gen, + block_ref_counter=self._block_ref_counter, + producer_id=self.id, output_ready_callback=functools.partial( self._handle_reduce_output_ready, partition_id ), @@ -207,6 +209,12 @@ def _emit_empty_partition(self, refs: RefBundle, schema: pa.Schema) -> None: ) refs.destroy_if_owned() + # Empty partition creates a new block; register it for memory tracking. + self._block_ref_counter.on_block_produced( + out_bundle.blocks[0].ref, # pyrefly: ignore[bad-argument-type] + block_meta.size_bytes or 0, + self.id, + ) self._num_reduce_tasks_submitted += 1 self._output_queue.append(out_bundle) self._metrics.on_output_queued(out_bundle) diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py index caf84f3d4a05..468d5f52f2b8 100644 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ b/python/ray/data/_internal/execution/operators/union_operator.py @@ -1,7 +1,10 @@ -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from typing_extensions import override +if TYPE_CHECKING: + from ray.data._internal.execution.block_ref_counter import BlockRefCounter + from ray.data._internal.execution.bundle_queue import BaseBundleQueue, FIFOBundleQueue from ray.data._internal.execution.interfaces import ( ExecutionOptions, @@ -59,12 +62,16 @@ def _input_queues(self) -> List["BaseBundleQueue"]: def _output_queues(self) -> List["BaseBundleQueue"]: return [self._output_buffer] - def start(self, options: ExecutionOptions): + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ): # Whether to preserve deterministic ordering of output blocks. # When True, blocks are emitted in round-robin order across inputs, # ensuring the same input always produces the same output order. self._preserve_order = options.preserve_order - super().start(options) + super().start(options, block_ref_counter) def num_outputs_total(self) -> Optional[int]: num_outputs = 0 diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index 2932dcae9e6e..6d6679e682ba 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -122,8 +122,12 @@ def all_inputs_done(self) -> None: refs = input_buffer.get_next() self._metrics.on_input_dequeued(refs, input_index=idx) - # Mark outputs as ready + # Zipping creates new blocks; register them for memory tracking. for ref in self._output_buffer: + for entry in ref.blocks: + self._block_ref_counter.on_block_produced( + entry.ref, entry.metadata.size_bytes or 0, self.id + ) self._metrics.on_output_queued(ref) super().all_inputs_done() diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index a100aa0352fc..9984846478d9 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -8,6 +8,7 @@ from ray._common.utils import env_bool, env_float from ray.data._internal.execution import create_resource_allocator +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, @@ -109,6 +110,7 @@ def __init__( options: ExecutionOptions, get_total_resources: Callable[[], ExecutionResources], data_context: DataContext, + block_ref_counter: BlockRefCounter, ): self._topology = topology self._options = options @@ -141,6 +143,8 @@ def __init__( # operator's output usage. self._output_operator = terminal_operator_from_topology(topology) + self._block_ref_counter = block_ref_counter + self._op_resource_allocator: Optional[ "OpResourceAllocator" ] = create_resource_allocator(self, data_context) @@ -184,15 +188,16 @@ def _estimate_object_store_memory_usage( return self._external_consumer_bytes return 0 - usage = op.estimate_object_store_usage(state) - self._mem_op_internal[op] = usage.internal - self._mem_op_outputs[op] = usage.outputs + # Blocks being generated by running tasks but not yet yielded as ObjectRefs. + mem_op_internal = op.metrics.obj_store_mem_pending_task_outputs or 0 - # Attribute iterator / streaming_split prefetch to the executor sink only. - if op is self._output_operator: - self._mem_op_outputs[op] += self._external_consumer_bytes + # All blocks produced by this operator that are still live (in any queue or + # held by any running task), as tracked by the block reference counter. + mem_op_outputs = self._block_ref_counter.get_object_store_memory_usage(op.id) - return self._mem_op_outputs[op] + self._mem_op_internal[op] + self._mem_op_internal[op] = mem_op_internal + self._mem_op_outputs[op] = mem_op_outputs + return mem_op_internal + mem_op_outputs def update_usages(self): """Recalculate resource usages.""" diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index abce1c902018..1bf651f9035c 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -14,6 +14,7 @@ BackpressurePolicy, get_backpressure_policies, ) +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.dataset_state import DatasetState from ray.data._internal.execution.execution_callback import ExecutionCallback from ray.data._internal.execution.interfaces import ( @@ -219,13 +220,17 @@ def execute( ) # Setup the streaming DAG topology and start the runner thread. - self._topology = build_streaming_topology(dag, self._options) + self._block_ref_counter = BlockRefCounter() + self._topology = build_streaming_topology( + dag, self._options, self._block_ref_counter + ) self._resource_manager = ResourceManager( self._topology, self._options, lambda: self._cluster_autoscaler.get_total_resources(), self._data_context, + self._block_ref_counter, ) # Constructed once per executor (not per scheduling iteration) so the @@ -343,6 +348,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None): op.shutdown(timer, force=force) self._clear_topology_queues_post_shutdown(force, exception) + # Queues have been drained; any remaining Ray Core callbacks that fire + # after this point should be no-ops. + self._block_ref_counter.clear() min_ = round(timer.min(), 3) max_ = round(timer.max(), 3) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index dd68e6adcb27..25446a3fe9b8 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -14,6 +14,7 @@ import ray from ray.data._internal.actor_autoscaler.autoscaling_actor_pool import ActorPoolInfo from ray.data._internal.execution.backpressure_policy import BackpressurePolicy +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.bundle_queue import ( ThreadSafeBundleQueue, create_bundle_queue, @@ -543,7 +544,9 @@ def mark_finished(self, exception: Optional[Exception] = None): def build_streaming_topology( - dag: PhysicalOperator, options: ExecutionOptions + dag: PhysicalOperator, + options: ExecutionOptions, + block_ref_counter: BlockRefCounter, ) -> Topology: """Instantiate the streaming operator state topology for the given DAG. @@ -554,6 +557,8 @@ def build_streaming_topology( Args: dag: The operator DAG to instantiate. options: The execution options to use to start operators. + block_ref_counter: The executor-wide shared counter for tracking + object-store memory. Returns: The topology dict holding the streaming execution state. @@ -575,7 +580,7 @@ def setup_state(op: PhysicalOperator) -> OpState: # Create state. op_state = OpState(op, inqueues) topology[op] = op_state - op.start(options) + op.start(options, block_ref_counter) return op_state setup_state(dag) diff --git a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py index 97e37b863fa8..3c9e0ffb3031 100644 --- a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py +++ b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py @@ -40,7 +40,7 @@ from ray.data.context import DataContext if typing.TYPE_CHECKING: - + from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces.physical_operator import ActorPoolInfo from ray.data._internal.progress.base_progress import BaseProgressBar @@ -490,8 +490,12 @@ def __init__( # Lifecycle # ------------------------------------------------------------------ - def start(self, options: ExecutionOptions) -> None: - super().start(options) + def start( + self, + options: ExecutionOptions, + block_ref_counter: "BlockRefCounter", + ) -> None: + super().start(options, block_ref_counter) self._rank_pool.start() def _add_input_inner(self, bundle: RefBundle, input_index: int) -> None: @@ -625,6 +629,8 @@ def _on_extraction_done( data_task = DataOpTask( task_index=rank_idx, streaming_gen=block_gen, + block_ref_counter=self._block_ref_counter, + producer_id=self.id, output_ready_callback=_on_bundle_ready, task_done_callback=functools.partial( _on_extraction_done, rank=rank_idx diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index 6df08575a778..b2ec834fbd76 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -13,6 +13,7 @@ import ray from ray._common.test_utils import wait_for_condition from ray._private.internal_api import get_memory_info_reply, get_state_from_address +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.operators.base_physical_operator import ( AllToAllOperator, ) @@ -44,10 +45,15 @@ def mock_all_to_all_op(input_op, name="MockAllToAll"): data_context=ray.data.DataContext.get_current(), name=name, ) - op.start = MagicMock(side_effect=lambda _: None) + op.start = MagicMock(side_effect=lambda *_: None) return op +def noop_counter(): + """BlockRefCounter that works without a Ray cluster.""" + return BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True) + + @pytest.fixture(scope="module") def data_context_override(request): overrides = getattr(request, "param", {}) diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index bd7dcdab09b5..75c48b00ab96 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -28,6 +28,7 @@ _estimate_total_available_task_slots, ) from ray.data._internal.compute import ActorPoolStrategy +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.bundle_queue import HashLinkedQueue from ray.data._internal.execution.interfaces import ( ExecutionOptions, @@ -56,6 +57,7 @@ DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR, DataContext, ) +from ray.data.tests.conftest import noop_counter from ray.data.tests.test_executor_resource_management import SMALL_STR from ray.data.tests.test_operators import _mul2_map_data_prcessor from ray.data.tests.util import ( @@ -964,7 +966,7 @@ def test_setting_initial_size_for_actor_pool(): ray_remote_args={"num_cpus": 1}, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) assert op._actor_pool.get_actor_info() == ActorPoolInfo( running=0, @@ -1005,7 +1007,7 @@ def test_internal_input_queue_is_empty_after_early_completion( ) # NOTE: This is blocking, until actor pool is fully started up - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) # Complete init sequence by completing pending metadata tasks (performed # by the executor) run_op_tasks_sync(op) @@ -1062,7 +1064,7 @@ def test_actor_pool_input_queue_draining( ) # NOTE: This is blocking, until actor pool is fully started up - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) # Finalize operator initialization sequence and make it schedulable run_op_tasks_sync(op, only_existing=True) @@ -1284,7 +1286,9 @@ def test_completed_when_downstream_op_has_finished_execution(ray_start_regular_s downstream_op = IdentityOperator( "Downstream", input_dependencies=[actor_pool_map_op], data_context=data_context ) - topology = build_streaming_topology(downstream_op, ExecutionOptions()) + topology = build_streaming_topology( + downstream_op, ExecutionOptions(), noop_counter() + ) # SETUP: Add a bundle to the upstream operator's external output queue. This is # necessary to reproduce the bug where the actor pool operator wouldn't complete if @@ -1518,7 +1522,7 @@ def _fail(): ) with pytest.raises(RayActorError, match=r"init_failed"): - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) @pytest.mark.parametrize( @@ -1571,7 +1575,7 @@ def _failing_transform( ray_remote_args={"max_concurrency": max_concurrency}, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) # Cannot add input until actor has started. assert not op.can_add_input() @@ -1614,7 +1618,7 @@ def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]: actor_pool = op._actor_pool # Wait for the op to scale up to the min size. - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) run_op_tasks_sync(op) assert actor_pool.num_running_actors() == num_actors assert op.num_active_tasks() == 0 diff --git a/python/ray/data/tests/test_block_ref_counter.py b/python/ray/data/tests/test_block_ref_counter.py new file mode 100644 index 000000000000..9e70f1cf7ba7 --- /dev/null +++ b/python/ray/data/tests/test_block_ref_counter.py @@ -0,0 +1,178 @@ +import gc +import time + +import numpy as np +import pytest + +import ray +from ray._common.test_utils import wait_for_condition +from ray.data._internal.execution.block_ref_counter import BlockRefCounter +from ray.data._internal.util import MiB +from ray.tests.conftest import * # noqa + +# Grace period for asserting a callback has NOT fired. Must be shorter than +# the task sleep in test_task_ref_keeps_counter_alive (1.0s); 0.3s leaves +# wide margin even on slow CI while still surfacing early-fire bugs. +_EARLY_FIRE_GRACE_S = 0.3 + + +@ray.remote +def _hold_ref_for(block_ref, sleep_s: float) -> bool: + """Hold *block_ref* as a task argument for *sleep_s* seconds, then return. + + Ray keeps an object alive for the duration of any task that received it as + an argument, so this lets tests assert the callback has not fired while the + task is still running. + """ + time.sleep(sleep_s) + return True + + +@pytest.fixture(params=["inlined", "regular"]) +def make_block(request): + """Factory for a block (ObjectRef, size_bytes), parametrized over the two + storage paths. + + Ray Core inlines tiny objects in the in-process store and puts larger ones + in the shared-memory object store; the out-of-scope callback must work for + both. Returning a factory (rather than the ref itself) avoids pytest holding + an extra reference that would keep the object alive past the test's own ``del``. + """ + + def _make() -> tuple["ray.ObjectRef", int]: + if request.param == "inlined": + data = np.zeros(1, dtype=np.uint8) + else: + data = np.zeros(1 * MiB, dtype=np.uint8) + return ray.put(data), len(data) + + return _make + + +def _wait_for_counter(*, counter, producer_id, expected, timeout_s: float = 10.0): + """Wait until *counter* reports *expected* bytes for *producer_id*. + + ``gc.collect()`` runs on each poll so any pending Python-level ObjectRef + destructors get a chance to run; the polling/timeout loop is delegated to + ``wait_for_condition`` (raises on timeout). + """ + + def _reached(): + gc.collect() + return counter.get_object_store_memory_usage(producer_id) == expected + + wait_for_condition(_reached, timeout=timeout_s) + + +class TestBlockRefCounterLifecycle: + def test_callback_fires_after_last_python_ref_deleted( + self, ray_start_regular_shared, make_block + ): + """Counter reaches 0 once the only Python ObjectRef is GC'd.""" + counter = BlockRefCounter() + ref, size_bytes = make_block() + + counter.on_block_produced(ref, size_bytes, "op_basic") + assert counter.get_object_store_memory_usage("op_basic") == size_bytes + + del ref # last Python ref gone + _wait_for_counter(counter=counter, producer_id="op_basic", expected=0) + + def test_second_python_ref_keeps_counter_alive( + self, ray_start_regular_shared, make_block + ): + """Counter stays non-zero while a second Python ObjectRef is alive. + + Dropping one of two refs that point at the same ObjectID must NOT fire + the callback. Only the final ref drop may do so. + """ + counter = BlockRefCounter() + ref1, size_bytes = make_block() + ref2 = ref1 # second Python ref to the same ObjectID + + counter.on_block_produced(ref1, size_bytes, "op_two_refs") + assert counter.get_object_store_memory_usage("op_two_refs") == size_bytes + + del ref1 + gc.collect() + time.sleep(_EARLY_FIRE_GRACE_S) # counter must still be non-zero + + assert ( + counter.get_object_store_memory_usage("op_two_refs") == size_bytes + ), "Callback fired too early — counter decremented while ref2 was still alive" + + del ref2 # last ref gone; callback must now fire + _wait_for_counter(counter=counter, producer_id="op_two_refs", expected=0) + + def test_task_ref_keeps_counter_alive_until_task_completes( + self, ray_start_regular_shared + ): + """Counter stays non-zero while a running Ray task holds the block. + + Ray keeps any object alive for the duration of a task that received it + as an argument. The callback should not fire until both conditions hold: + (a) the task has completed, and (b) all Python refs are dropped. + + Uses a plasma (by-reference) object specifically: tiny objects are + inlined into the task by value, so they would not get a task-argument + reference and this lifetime-extension behavior would not apply. + """ + counter = BlockRefCounter() + ref = ray.put( + np.zeros(1 * MiB, dtype=np.uint8) # pyrefly: ignore[bad-argument-type] + ) + + counter.on_block_produced(ref, 1 * MiB, "op_task") + assert counter.get_object_store_memory_usage("op_task") == 1 * MiB + + # Submit a task that sleeps while holding the block, then drop the Python + # ref so only the task's argument reference remains. + task_future = _hold_ref_for.remote(ref, 1.0) + del ref + gc.collect() + time.sleep(_EARLY_FIRE_GRACE_S) # task still running; callback must NOT fire + + assert ( + counter.get_object_store_memory_usage("op_task") == 1 * MiB + ), "Callback fired too early: counter decremented while task was still running" + + ray.get(task_future) # task completes; now both refs are gone + _wait_for_counter(counter=counter, producer_id="op_task", expected=0) + + +class TestBlockRefCounterNonOwnedBlocks: + def test_non_owned_block_not_tracked(self, ray_start_regular_shared): + """on_block_produced gracefully skips blocks not owned by this worker. + + When SplitCoordinator runs a streaming executor inside an actor, the + input blocks are owned by the driver. The actor cannot register + out-of-scope callbacks on them, so on_block_produced should catch the + ValueError and leave the usage at zero. + + We wrap the ObjectRef in a list to prevent Ray from auto-resolving it, + mirroring how SplitCoordinator receives refs embedded in a serialized + Dataset object. + """ + + @ray.remote + class CounterActor: + def try_track_non_owned_block(self, block_ref_wrapped, size_bytes): + block_ref = block_ref_wrapped[0] + counter = BlockRefCounter() + counter.on_block_produced(block_ref, size_bytes, "op_split") + return counter.get_object_store_memory_usage("op_split") + + block_ref = ray.put( + np.zeros(1, dtype=np.uint8) # pyrefly: ignore[bad-argument-type] + ) + actor = CounterActor.remote() # pyrefly: ignore[missing-attribute] + usage = ray.get(actor.try_track_non_owned_block.remote([block_ref], 1)) + assert ( + usage == 0 + ), f"Non-owned block should not be tracked, but got {usage} bytes" + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index 3330661c65e7..6b2938e7e747 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -3,6 +3,7 @@ import ray from ray.data._internal.actor_autoscaler import ActorPoolScalingRequest from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces import ExecutionOptions, ExecutionResources from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.limit_operator import LimitOperator @@ -206,7 +207,7 @@ def test_task_pool_resource_reporting(ray_start_10_cpus_shared): name="TestMapper", compute_strategy=TaskPoolStrategy(), ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) assert op.current_logical_usage() == ExecutionResources(cpu=0, gpu=0, memory=0) assert op.metrics.obj_store_mem_internal_inqueue == 0 @@ -251,7 +252,7 @@ def test_task_pool_resource_reporting_with_dynamic_remote_args( ray_remote_args={"num_cpus": 1}, ray_remote_args_fn=lambda: {"memory": 500}, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) assert op.current_logical_usage() == ExecutionResources(cpu=0, gpu=0, memory=0) @@ -278,7 +279,7 @@ def test_task_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): compute_strategy=TaskPoolStrategy(), min_rows_per_bundle=3, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) assert op.current_logical_usage() == ExecutionResources(cpu=0, gpu=0, memory=0) assert op.metrics.obj_store_mem_internal_inqueue == 0 @@ -338,7 +339,7 @@ def test_actor_pool_scheduling(ray_start_10_cpus_shared, restore_data_context): ) # NOTE: This is blocking, until actors are fully started up - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources(cpu=2, gpu=0, object_store_memory=0) @@ -459,7 +460,7 @@ def test_actor_pool_resource_reporting_with_dynamic_remote_args( ) # Blocking until actors are fully started - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) run_op_tasks_sync(op, only_existing=True) # Should reflect dynamic resources: 2 actors * (1 cpu, 500 memory) @@ -489,7 +490,7 @@ def test_actor_pool_scheduling_with_bundling( ) # NOTE: This is blocking, until actor pool is fully started up - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources(cpu=2, gpu=0, object_store_memory=0) @@ -639,7 +640,7 @@ def test_limit_resource_reporting(ray_start_10_cpus_shared): make_ref_bundles([[SMALL_STR, SMALL_STR] for i in range(2)]), ) # Two two-row bundles op = LimitOperator(3, input_op, DataContext.get_current()) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) assert op.current_logical_usage() == ExecutionResources( cpu=0, gpu=0, object_store_memory=0, memory=0 @@ -672,7 +673,7 @@ def test_output_splitter_resource_reporting(ray_start_10_cpus_shared): data_context=DataContext.get_current(), locality_hints=["0", "1"], ) - op.start(ExecutionOptions(actor_locality_enabled=True)) + op.start(ExecutionOptions(actor_locality_enabled=True), BlockRefCounter()) assert op.current_logical_usage() == ExecutionResources( cpu=0, gpu=0, object_store_memory=0, memory=0 diff --git a/python/ray/data/tests/test_gpu_shuffle.py b/python/ray/data/tests/test_gpu_shuffle.py index 61cf0cc1f4a7..d02de008755d 100644 --- a/python/ray/data/tests/test_gpu_shuffle.py +++ b/python/ray/data/tests/test_gpu_shuffle.py @@ -15,6 +15,7 @@ import ray.data._internal.gpu_shuffle.hash_shuffle as hash_shuffle from ray.data._internal.execution.interfaces import ( BlockEntry, + ExecutionOptions, ExecutionResources, PhysicalOperator, RefBundle, @@ -31,6 +32,7 @@ from ray.data._internal.util import explain_plan from ray.data.block import BlockMetadata from ray.data.context import DataContext, ShuffleStrategy +from ray.data.tests.conftest import noop_counter # --------------------------------------------------------------------------- # Helpers @@ -433,6 +435,7 @@ def _make_op(self, nranks=2, num_partitions=4): ) op._rank_pool._actors = mock_actors op._rank_pool._nranks = nranks + op.start(ExecutionOptions(), noop_counter()) return op, mock_actors def test_finalization_not_started_until_inputs_complete(self): diff --git a/python/ray/data/tests/test_limit_operator.py b/python/ray/data/tests/test_limit_operator.py index 022c9252376e..1cec3e2d2725 100644 --- a/python/ray/data/tests/test_limit_operator.py +++ b/python/ray/data/tests/test_limit_operator.py @@ -5,12 +5,14 @@ import pytest import ray +from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.limit_operator import LimitOperator from ray.data._internal.execution.streaming_executor import StreamingExecutor from ray.data._internal.execution.util import make_ref_bundles from ray.data._internal.logical.optimizers import get_execution_plan from ray.data.context import DataContext +from ray.data.tests.conftest import noop_counter from ray.data.tests.util import run_op_tasks_sync from ray.tests.conftest import * # noqa @@ -60,6 +62,9 @@ def test_limit_operator(ray_start_regular_shared): refs = make_ref_bundles([[i] * num_rows_per_block for i in range(num_refs)]) input_op = InputDataBuffer(DataContext.get_current(), refs) limit_op = LimitOperator(limit, input_op, DataContext.get_current()) + counter = noop_counter() + input_op.start(ExecutionOptions(), counter) + limit_op.start(ExecutionOptions(), counter) limit_op.mark_execution_finished = MagicMock( wraps=limit_op.mark_execution_finished ) diff --git a/python/ray/data/tests/test_map_operator.py b/python/ray/data/tests/test_map_operator.py index 4b3c4bee4f3a..81b6050b6b31 100644 --- a/python/ray/data/tests/test_map_operator.py +++ b/python/ray/data/tests/test_map_operator.py @@ -9,6 +9,7 @@ import ray from ray._common.test_utils import wait_for_condition from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces import ( ExecutionOptions, ) @@ -75,7 +76,7 @@ def _run_map_operator_test( ) # Feed data and block on exec. - op.start(ExecutionOptions(preserve_order=preserve_order)) + op.start(ExecutionOptions(preserve_order=preserve_order), BlockRefCounter()) if use_actors: # Wait for actors to be ready before adding inputs. run_op_tasks_sync(op, only_existing=True) @@ -116,7 +117,10 @@ def test_map_operator_streamed(ray_start_regular_shared, use_actors): output = [] # Use preserve_order so output order matches input order (required for # actor pool, which otherwise returns results in completion order). - op.start(ExecutionOptions(actor_locality_enabled=True, preserve_order=True)) + op.start( + ExecutionOptions(actor_locality_enabled=True, preserve_order=True), + BlockRefCounter(), + ) if use_actors: # Wait for actors to be ready before adding inputs. @@ -180,7 +184,7 @@ def test_map_operator_actor_locality_stats(ray_start_regular_shared): options = ExecutionOptions() options.preserve_order = True options.actor_locality_enabled = True - op.start(options) + op.start(options, BlockRefCounter()) # Wait for actors to be ready before adding inputs. run_op_tasks_sync(op, only_existing=True) @@ -242,7 +246,7 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]: ) # Feed data and block on exec. - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) if use_actors: # Wait for actors to be ready before adding inputs. run_op_tasks_sync(op, only_existing=True) @@ -395,7 +399,7 @@ def test_map_operator_ray_args(shutdown_only, use_actors): ) # Feed data and block on exec. - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) if use_actors: # Wait for the actor to start. run_op_tasks_sync(op) @@ -443,7 +447,7 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]: ) # Start one task and then cancel. - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) if use_actors: # Wait for the actor to start. run_op_tasks_sync(op) @@ -513,7 +517,7 @@ def map_fn(block_iter: Iterable[Block], ctx: TaskContext) -> Iterable[Block]: compute_strategy=compute_strategy, ) op.add_map_task_kwargs_fn(lambda: kwargs) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) if use_actors: # Wait for the actor to start. run_op_tasks_sync(op) @@ -576,7 +580,7 @@ def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]: min_rows_per_bundle=min_rows_per_bundle, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) if op.metrics.num_inputs_received % min_rows_per_bundle == 0: @@ -621,7 +625,7 @@ def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]: ) op.set_additional_split_factor(2) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) if op.metrics.num_inputs_received % min_rows_per_bundle == 0: @@ -659,7 +663,7 @@ def map_fn(block_iter: Iterable[Block], ctx) -> Iterable[Block]: min_rows_per_bundle=MIN_ROWS_PER_BUNDLE, ) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) num_outputs_taken = 0 bytes_outputs_taken = 0 for i in range(len(inputs)): diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index d7ed2636b816..b5b1083bbc38 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -5,6 +5,7 @@ import pytest import ray +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces import ( BlockEntry, ExecutionOptions, @@ -99,7 +100,7 @@ def dummy_all_transform(bundles: List[RefBundle], ctx): op.set_sub_progress_bar(name, pg) # Feed data. - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) op.all_inputs_done() @@ -145,7 +146,7 @@ def dummy_all_transform(bundles: List[RefBundle]): # Feed data and implement streaming exec. output = [] - op1.start(ExecutionOptions(actor_locality_enabled=True)) + op1.start(ExecutionOptions(actor_locality_enabled=True), BlockRefCounter()) while input_op.has_next(): op1.add_input(input_op.get_next(), 0) while not op1.has_next(): @@ -182,6 +183,8 @@ def all_transform(bundles: List[RefBundle], ctx): DataContext.get_current().target_max_block_size, ) + op1.start(ExecutionOptions(), BlockRefCounter()) + op2.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): op1.add_input(input_op.get_next(), 0) op1.all_inputs_done() diff --git a/python/ray/data/tests/test_output_splitter.py b/python/ray/data/tests/test_output_splitter.py index 8be226a25fa8..2ad05a8ed324 100644 --- a/python/ray/data/tests/test_output_splitter.py +++ b/python/ray/data/tests/test_output_splitter.py @@ -5,6 +5,7 @@ import pytest import ray +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.output_splitter import OutputSplitter @@ -35,7 +36,7 @@ def test_split_operator(ray_start_regular_shared, equal, chunk_size): # Feed data and implement streaming exec. output_splits = [[] for _ in range(num_splits)] - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): for _ in range(num_add_input_blocks): if not input_op.has_next(): @@ -79,7 +80,7 @@ def test_split_operator_random(ray_start_regular_shared, equal, random_seed): # Feed data and implement streaming exec. output_splits = collections.defaultdict(list) - op.start(ExecutionOptions()) + op.start(ExecutionOptions(), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) op.all_inputs_done() @@ -124,7 +125,7 @@ def get_bundle_loc(bundle): # Feed data and implement streaming exec. output_splits = collections.defaultdict(list) - op.start(ExecutionOptions(actor_locality_enabled=True)) + op.start(ExecutionOptions(actor_locality_enabled=True), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) op.all_inputs_done() @@ -192,7 +193,7 @@ def _get_fake_bundle_loc(bundle): output_splits = [[] for _ in range(3)] yielded_incrementally = 0 - op.start(ExecutionOptions(actor_locality_enabled=True)) + op.start(ExecutionOptions(actor_locality_enabled=True), BlockRefCounter()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) diff --git a/python/ray/data/tests/test_reservation_based_resource_allocator.py b/python/ray/data/tests/test_reservation_based_resource_allocator.py index 444ca0946aa3..f68a341e1ab1 100644 --- a/python/ray/data/tests/test_reservation_based_resource_allocator.py +++ b/python/ray/data/tests/test_reservation_based_resource_allocator.py @@ -3,6 +3,7 @@ import pytest import ray +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, @@ -20,6 +21,7 @@ from ray.data._internal.execution.util import make_ref_bundles from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa +from ray.data.tests.conftest import noop_counter from ray.data.tests.test_resource_manager import ( mock_join_op, mock_map_op, @@ -59,7 +61,7 @@ def test_basic(self, restore_data_context): op_internal_usage = dict.fromkeys([o1, o2, o3, o4], 0) op_outputs_usages = dict.fromkeys([o1, o2, o3, o4], 0) - topo = build_streaming_topology(o4, ExecutionOptions()) + topo = build_streaming_topology(o4, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources.zero() @@ -72,6 +74,7 @@ def mock_get_global_limits(): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = op_internal_usage @@ -230,13 +233,14 @@ def test_reserve_min_resource_requirements(self, restore_data_context): ) ) - topo = build_streaming_topology(o5, ExecutionOptions()) + topo = build_streaming_topology(o5, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock( return_value=ExecutionResources.zero() @@ -291,13 +295,14 @@ def test_reserve_min_resources_for_gpu_ops(self, restore_data_context): ) ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock( return_value=ExecutionResources.zero() @@ -327,12 +332,13 @@ def test_does_not_reserve_more_than_max_resource_usage(self): ExecutionResources(cpu=1, object_store_memory=1), ) ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock( return_value=ExecutionResources.zero() @@ -383,7 +389,7 @@ def test_budget_capped_by_max_resource_usage(self, restore_data_context): ) ) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(cpu=20, object_store_memory=400) @@ -398,6 +404,7 @@ def test_budget_capped_by_max_resource_usage(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = {o1: 0, o2: 40, o3: 40} @@ -460,7 +467,7 @@ def test_budget_capped_by_max_resource_usage_all_capped(self, restore_data_conte ) ) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(cpu=20, object_store_memory=400) @@ -475,6 +482,7 @@ def test_budget_capped_by_max_resource_usage_all_capped(self, restore_data_conte ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = {o1: 0, o2: 40, o3: 40} @@ -502,13 +510,14 @@ def test_only_handle_eligible_ops(self, restore_data_context): o1 = InputDataBuffer(DataContext.get_current(), input) o2 = mock_map_op(o1) o3 = LimitOperator(1, o2, DataContext.get_current()) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock( return_value=ExecutionResources.zero() @@ -558,7 +567,7 @@ def test_gpu_allocation(self, restore_data_context): return_value=(ExecutionResources(0, 1, 0), ExecutionResources.inf()) ) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(gpu=4) op_usages = { @@ -572,6 +581,7 @@ def test_gpu_allocation(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) @@ -606,7 +616,7 @@ def test_multiple_gpu_operators(self, restore_data_context): return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 1, 0)) ) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(gpu=4) op_usages = { @@ -620,6 +630,7 @@ def test_multiple_gpu_operators(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) @@ -647,7 +658,7 @@ def test_gpu_usage_exceeds_global_limits(self, restore_data_context): return_value=(ExecutionResources(0, 1, 0), ExecutionResources(0, 2, 0)) ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(gpu=1) op_usages = { @@ -663,6 +674,7 @@ def test_gpu_usage_exceeds_global_limits(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2], 0) @@ -696,7 +708,7 @@ def test_gpu_unbounded_operator_can_autoscale(self, restore_data_context): return_value=(ExecutionResources(0, 1, 0), ExecutionResources.inf()) ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(gpu=8) op_usages = { @@ -709,6 +721,7 @@ def test_gpu_unbounded_operator_can_autoscale(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2], 0) @@ -759,7 +772,7 @@ def test_actor_pool_gpu_operator_gets_gpu_budget_in_cpu_pipeline( ) o5 = mock_map_op(o4, ray_remote_args={"num_cpus": 1}, name="Write") - topo = build_streaming_topology(o5, ExecutionOptions()) + topo = build_streaming_topology(o5, ExecutionOptions(), noop_counter()) # Cluster with 2 GPUs available global_limits = ExecutionResources( @@ -780,6 +793,7 @@ def test_actor_pool_gpu_operator_gets_gpu_budget_in_cpu_pipeline( ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3, o4, o5], 0) @@ -822,7 +836,7 @@ def test_gpu_bounded_vs_unbounded_operators(self, restore_data_context): return_value=(ExecutionResources(0, 1, 0), ExecutionResources.inf()) ) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(gpu=8) op_usages = { @@ -836,6 +850,7 @@ def test_gpu_bounded_vs_unbounded_operators(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys([o1, o2, o3], 0) @@ -913,7 +928,7 @@ def test_gpu_not_reserved_for_non_gpu_operators( ) ) - topo = build_streaming_topology(write_op, ExecutionOptions()) + topo = build_streaming_topology(write_op, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(cpu=8, gpu=8, object_store_memory=10_000_000) ops = [o1, read_op, infer1_op, infer2_op, write_op] @@ -924,6 +939,7 @@ def test_gpu_not_reserved_for_non_gpu_operators( ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = dict.fromkeys(ops, 0) @@ -976,7 +992,7 @@ def test_reservation_accounts_for_completed_ops(self, restore_data_context): op_internal_usage = dict.fromkeys([o1, o2, o3, o4], 0) op_outputs_usages = dict.fromkeys([o1, o2, o3, o4], 0) - topo = build_streaming_topology(o4, ExecutionOptions()) + topo = build_streaming_topology(o4, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources(cpu=10, object_store_memory=250) @@ -985,6 +1001,7 @@ def test_reservation_accounts_for_completed_ops(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager._mem_op_internal = op_internal_usage @@ -1069,7 +1086,7 @@ def test_reservation_accounts_for_completed_ops_complex_graph( op_internal_usage = dict.fromkeys([o1, o2, o3, o4, o5, o6, o7, o8], 0) op_outputs_usages = dict.fromkeys([o1, o2, o3, o4, o5, o6, o7, o8], 0) - topo = build_streaming_topology(o8, ExecutionOptions()) + topo = build_streaming_topology(o8, ExecutionOptions(), noop_counter()) global_limits = ExecutionResources.zero() @@ -1082,6 +1099,7 @@ def mock_get_global_limits(): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) resource_manager.get_global_limits = MagicMock( diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index f06e82a74aa7..c43ca608f2e1 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -1,6 +1,5 @@ import math import time -from dataclasses import replace from datetime import timedelta from typing import Any, Dict, Optional from unittest.mock import MagicMock, patch @@ -8,17 +7,13 @@ import pytest from freezegun import freeze_time -import ray from ray.data._internal.compute import ComputeStrategy -from ray.data._internal.execution.interfaces import BlockEntry, PhysicalOperator +from ray.data._internal.execution.block_ref_counter import BlockRefCounter +from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, ) -from ray.data._internal.execution.interfaces.physical_operator import ( - ObjectStoreUsage, - TaskExecDriverStats, -) from ray.data._internal.execution.operators.base_physical_operator import ( AllToAllOperator, ) @@ -36,10 +31,25 @@ OutputBackpressureGuard, build_streaming_topology, ) -from ray.data._internal.execution.util import make_ref_bundles -from ray.data.block import TaskExecWorkerStats from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa +from ray.data.tests.conftest import noop_counter + + +class StubBlockRefCounter: + """Test double for BlockRefCounter with directly settable per-operator usage.""" + + def __init__(self): + self._usage = {} + + def set_usage(self, producer_id: str, bytes: int) -> None: + self._usage[producer_id] = bytes + + def get_object_store_memory_usage(self, producer_id: str) -> int: + return self._usage.get(producer_id, 0) + + def clear(self) -> None: + self._usage.clear() def mock_map_op( @@ -56,7 +66,6 @@ def mock_map_op( compute_strategy=compute_strategy, name=name, ) - op.start(ExecutionOptions()) return op @@ -65,7 +74,7 @@ def mock_union_op(input_ops): DataContext.get_current(), *input_ops, ) - op.start = MagicMock(side_effect=lambda _: None) + op.start = MagicMock(side_effect=lambda *_: None) return op @@ -89,7 +98,7 @@ def mock_join_op(left_input_op, right_input_op): partition_size_hint=1, ) - op.start = MagicMock(side_effect=lambda _: None) + op.start = MagicMock(side_effect=lambda *_: None) return op @@ -101,7 +110,6 @@ def mock_all_to_all_op(input_op, name="MockShuffle"): data_context=DataContext.get_current(), name=name, ) - op.start(ExecutionOptions()) return op @@ -119,6 +127,7 @@ def _resource_manager_for_limits_only_test( options, get_total_resources, DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) @@ -220,9 +229,8 @@ def test_update_usage(self): o1 = InputDataBuffer(DataContext.get_current(), []) o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) - # Mock different metrics that contribute to the resource usage. mock_cpu = { o1: 0, o2: 5, @@ -233,26 +241,11 @@ def test_update_usage(self): o2: 100, o3: 200, } - mock_internal_outqueue = { + mock_counter_bytes = { o1: 0, o2: 300, o3: 400, } - mock_external_outqueue_sizes = { - o1: 100, - o2: 500, - o3: 600, - } - mock_internal_inqueue = { - o1: 0, - o2: 700, - o3: 800, - } - mock_pending_task_inputs = { - o1: 0, - o2: 900, - o3: 1000, - } for op in [o1, o2, o3]: op.current_logical_usage = MagicMock( @@ -265,59 +258,42 @@ def test_update_usage(self): op.extra_resource_usage = MagicMock(return_value=ExecutionResources.zero()) op._metrics = MagicMock( obj_store_mem_pending_task_outputs=mock_pending_task_outputs[op], - obj_store_mem_internal_outqueue=mock_internal_outqueue[op], - obj_store_mem_internal_inqueue=mock_internal_inqueue[op], - obj_store_mem_pending_task_inputs=mock_pending_task_inputs[op], ) - op._metrics.obj_store_mem_internal_inqueue_for_input = MagicMock( - return_value=mock_internal_inqueue[op], - ) - ref_bundle = MagicMock( - size_bytes=MagicMock(return_value=mock_external_outqueue_sizes[op]), - output_split_idx=None, - ) - topo[op].add_output(ref_bundle) + counter = StubBlockRefCounter() resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + counter, ) resource_manager._op_resource_allocator = None + + for op in [o2, o3]: + if mock_counter_bytes[op]: + counter.set_usage(op.id, mock_counter_bytes[op]) + resource_manager.update_usages() global_cpu = 0 global_mem = 0 for op in [o1, o2, o3]: if op == o1: - # Resource usage of InputDataBuffer doesn't count. + # InputDataBuffer memory is not counted. expected_mem = 0 else: - expected_mem = ( - mock_pending_task_outputs[op] - + mock_internal_outqueue[op] - + mock_external_outqueue_sizes[op] - ) - for next_op in op.output_dependencies: - expected_mem += ( - +mock_internal_inqueue[next_op] - + mock_pending_task_inputs[next_op] - ) + expected_mem = mock_pending_task_outputs[op] + mock_counter_bytes[op] op_usage = resource_manager.get_op_usage(op) assert op_usage.cpu == mock_cpu[op] assert op_usage.gpu == 0 assert op_usage.object_store_memory == expected_mem if op != o1: - # _mem_op_internal only includes pending_task_outputs assert ( resource_manager._mem_op_internal[op] == mock_pending_task_outputs[op] ) - assert ( - resource_manager._mem_op_outputs[op] - == expected_mem - resource_manager._mem_op_internal[op] - ) + assert resource_manager._mem_op_outputs[op] == mock_counter_bytes[op] global_cpu += mock_cpu[op] global_mem += expected_mem @@ -326,143 +302,83 @@ def test_update_usage(self): ) def test_object_store_usage(self, restore_data_context): - input = make_ref_bundles([[x] for x in range(1)])[0] - # Set block metadata size_bytes to 1 (rather than mocking the method on the - # instance, which doesn't survive dataclasses.replace in OpBufferQueue.pop). - entry = input.blocks[0] - input = replace( - input, - blocks=[BlockEntry(entry.ref, replace(entry.metadata, size_bytes=1))], - ) + """ResourceManager reads per-operator memory from BlockRefCounter.""" - o1 = InputDataBuffer(DataContext.get_current(), [input]) + o1 = InputDataBuffer(DataContext.get_current(), []) o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) + counter = StubBlockRefCounter() resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(return_value=ExecutionResources.zero()), DataContext.get_current(), + counter, ) - ray.data.DataContext.get_current()._max_num_blocks_in_streaming_gen_buffer = 1 - ray.data.DataContext.get_current().target_max_block_size = 2 resource_manager.update_usages() assert resource_manager.get_op_usage(o1).object_store_memory == 0 assert resource_manager.get_op_usage(o2).object_store_memory == 0 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Objects in an operator's internal inqueue typically count toward the previous - # operator's object store memory usage. However, data from an - # `InputDataBuffer` aren't counted because they were created outside of this - # execution. - o2.metrics.on_input_queued(input, input_index=0) + # Simulate o2 producing a 100-byte block. + counter.set_usage(o2.id, 100) resource_manager.update_usages() assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 0 + assert resource_manager.get_op_usage(o2).object_store_memory == 100 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # During no-sample phase, obj_store_mem_pending_task_outputs uses fallback - # estimate based on target_max_block_size. - o2.metrics.on_input_dequeued(input, input_index=0) - o2.metrics.on_task_submitted(0, input) + # Simulate o3 producing a 200-byte block. + counter.set_usage(o3.id, 200) resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - # No sample available yet, returns None - assert o2.metrics.obj_store_mem_pending_task_outputs is None - op2_usage = resource_manager.get_op_usage(o2).object_store_memory - # When pending task outputs is None, it's treated as 0 - assert op2_usage == 0 - assert resource_manager.get_op_usage(o3).object_store_memory == 0 - - # When the task finishes, we move the data from the streaming generator to the - # operator's internal outqueue. - o2.metrics.on_output_queued(input) - o2.metrics.on_task_finished( - 0, - None, - TaskExecWorkerStats(task_wall_time_s=0.0), - TaskExecDriverStats(task_output_backpressure_s=0), - ) - resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 1 - assert resource_manager.get_op_usage(o3).object_store_memory == 0 + assert resource_manager.get_op_usage(o2).object_store_memory == 100 + assert resource_manager.get_op_usage(o3).object_store_memory == 200 - o2.metrics.on_output_dequeued(input) - topo[o2].output_queue.append(input) + # Simulate o2's block being freed. + counter.set_usage(o2.id, 0) resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 1 - assert resource_manager.get_op_usage(o3).object_store_memory == 0 + assert resource_manager.get_op_usage(o2).object_store_memory == 0 + assert resource_manager.get_op_usage(o3).object_store_memory == 200 - # Objects in the current operator's internal inqueue count towards the previous - # operator's object store memory usage. - # NOTE: `pop()` returns a copy of the bundle (via `dataclasses.replace`), so we - # must use the returned reference for subsequent o3 metric calls. - o3_input = topo[o2].output_queue.pop() - o3.metrics.on_input_queued(o3_input, input_index=0) + # After clear(), all usage resets to 0. + counter.clear() resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 1 + assert resource_manager.get_op_usage(o2).object_store_memory == 0 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Task inputs count toward the previous operator's object store memory - # usage. During no-sample phase, pending task outputs uses fallback estimate. - o3.metrics.on_input_dequeued(o3_input, input_index=0) - o3.metrics.on_task_submitted(0, o3_input) - resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 1 - # No sample available yet, returns None - assert o3.metrics.obj_store_mem_pending_task_outputs is None - op3_usage = resource_manager.get_op_usage(o3).object_store_memory - # When pending task outputs is None, it's treated as 0 - assert op3_usage == 0 - - # Task inputs no longer count once the task is finished. - o3.metrics.on_output_queued(o3_input) - o3.metrics.on_task_finished( - 0, - None, - TaskExecWorkerStats(task_wall_time_s=0.0), - TaskExecDriverStats(task_output_backpressure_s=0), - ) - resource_manager.update_usages() - assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 0 - assert resource_manager.get_op_usage(o3).object_store_memory == 1 - - def test_object_store_accounting_delegates_to_op(self, restore_data_context): - """``ResourceManager`` must dispatch to ``op.estimate_object_store_usage`` so subclasses can override the accounting.""" - # Real upstream so the override op has a valid input dependency. - input = make_ref_bundles([[x] for x in range(1)])[0] - upstream = InputDataBuffer(DataContext.get_current(), [input]) - - # Subclass that overrides the accounting to return hard-coded - # values — bypasses the generic metrics+state computation. - override = mock_map_op(upstream) - override.estimate_object_store_usage = lambda state: ObjectStoreUsage( - internal=42, outputs=100 - ) + def test_union_no_double_counting(self, restore_data_context): + """UnionOperator passthrough does not inflate global memory usage.""" - topo = build_streaming_topology(override, ExecutionOptions()) + o1 = InputDataBuffer(DataContext.get_current(), []) + map_a = mock_map_op(o1, name="MapA") + o2 = InputDataBuffer(DataContext.get_current(), []) + map_b = mock_map_op(o2, name="MapB") + union_op = mock_union_op([map_a, map_b]) + downstream = mock_map_op(union_op, name="Downstream") + + topo = build_streaming_topology(downstream, ExecutionOptions(), noop_counter()) + counter = StubBlockRefCounter() resource_manager = ResourceManager( topo, ExecutionOptions(), - MagicMock(return_value=ExecutionResources.zero()), + MagicMock(return_value=ExecutionResources(object_store_memory=10_000)), DataContext.get_current(), + counter, ) + counter.set_usage(map_a.id, 100) + counter.set_usage(map_b.id, 200) + resource_manager.update_usages() - # The override's hard-coded values flow through unchanged into - # both the per-component dicts and the aggregated op usage. - assert resource_manager.get_mem_op_internal(override) == 42 - assert resource_manager.get_mem_op_outputs(override) == 100 - assert resource_manager.get_op_usage(override).object_store_memory == 42 + 100 + assert resource_manager.get_op_usage(map_a).object_store_memory == 100 + assert resource_manager.get_op_usage(map_b).object_store_memory == 200 + assert resource_manager.get_op_usage(union_op).object_store_memory == 0 + + total_obj_store = resource_manager.get_global_usage().object_store_memory + assert total_obj_store == 300 def test_get_completed_ops_usage(self, restore_data_context): """Test that _get_completed_ops_usage returns total usage of completed ops.""" @@ -475,7 +391,7 @@ def test_get_completed_ops_usage(self, restore_data_context): o1.mark_execution_finished() o2.mark_execution_finished() - topo = build_streaming_topology(o5, ExecutionOptions()) + topo = build_streaming_topology(o5, ExecutionOptions(), noop_counter()) op_usages = { o1: ExecutionResources.zero(), @@ -490,6 +406,7 @@ def test_get_completed_ops_usage(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) @@ -534,7 +451,7 @@ def test_get_completed_ops_usage_complex_graph(self, restore_data_context): o5.mark_execution_finished() o7.mark_execution_finished() - topo = build_streaming_topology(o8, ExecutionOptions()) + topo = build_streaming_topology(o8, ExecutionOptions(), noop_counter()) op_usages = { o1: ExecutionResources.zero(), @@ -552,6 +469,7 @@ def test_get_completed_ops_usage_complex_graph(self, restore_data_context): ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op]) @@ -576,12 +494,13 @@ def test_external_consumer_bytes_attributed_to_terminal_operator( o1.mark_execution_finished() o2.mark_execution_finished() - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), lambda: cluster_resources, DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) for op in [o1, o2, o3]: @@ -644,12 +563,13 @@ def test_external_consumer_bytes_input_data_buffer_sink(self, restore_data_conte attach to that terminal sink instead of being dropped by the InputDataBuffer early return.""" buf = InputDataBuffer(DataContext.get_current(), []) - topo = build_streaming_topology(buf, ExecutionOptions()) + topo = build_streaming_topology(buf, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), lambda: ExecutionResources(cpu=10, gpu=0, object_store_memory=1000), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) buf.current_logical_usage = MagicMock(return_value=ExecutionResources.zero()) buf.running_logical_usage = MagicMock(return_value=ExecutionResources.zero()) @@ -675,12 +595,13 @@ def test_external_consumer_bytes_surfaced_in_op_usage_str( o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), lambda: cluster_resources, DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) for op in [o1, o2, o3]: @@ -723,6 +644,7 @@ def test_topology_rejects_multiple_terminal_operators(self, restore_data_context ExecutionOptions(), MagicMock(return_value=ExecutionResources.zero()), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) def test_topology_rejects_empty_topology(self, restore_data_context): @@ -732,6 +654,7 @@ def test_topology_rejects_empty_topology(self, restore_data_context): ExecutionOptions(), MagicMock(return_value=ExecutionResources.zero()), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) def test_topology_rejects_no_terminal_operator(self, restore_data_context): @@ -749,6 +672,7 @@ def test_topology_rejects_no_terminal_operator(self, restore_data_context): ExecutionOptions(), MagicMock(return_value=ExecutionResources.zero()), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) def test_is_blocking_materializing_op(self, restore_data_context): @@ -770,13 +694,14 @@ def test_is_blocking_materializing_op(self, restore_data_context): o4 = mock_all_to_all_op(o3, name="Sort") o5 = mock_map_op(o4, name="Map2") - topo = build_streaming_topology(o5, ExecutionOptions()) + topo = build_streaming_topology(o5, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) # Case 1: Shuffle operator itself is blocking materializing @@ -801,12 +726,13 @@ def test_is_blocking_materializing_op(self, restore_data_context): o6 = LimitOperator(1, o5, DataContext.get_current()) o7 = mock_map_op(o6, name="Map3") - topo2 = build_streaming_topology(o7, ExecutionOptions()) + topo2 = build_streaming_topology(o7, ExecutionOptions(), noop_counter()) resource_manager2 = ResourceManager( topo2, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) # o5's downstream (o6, o7) has no blocking materializing ops @@ -826,7 +752,7 @@ def test_memory_limit_blocks_task_submission(self, restore_data_context): name="HighMemoryTask", ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) options = ExecutionOptions() resource_manager = ResourceManager( @@ -834,6 +760,9 @@ def test_memory_limit_blocks_task_submission(self, restore_data_context): options=options, get_total_resources=lambda: cluster_resources, data_context=DataContext.get_current(), + block_ref_counter=BlockRefCounter( + add_object_out_of_scope_callback=lambda *_: True + ), ) resource_manager.update_usages() @@ -859,13 +788,14 @@ def test_unblock_backpressure_terminal_operator(self, restore_data_context): o2 = mock_map_op(o1) o3 = LimitOperator(1, o2, DataContext.get_current()) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) guard = OutputBackpressureGuard(topo, resource_manager) @@ -876,13 +806,14 @@ def test_unblock_backpressure_terminal_operator(self, restore_data_context): # Add o4 operator - o2 is no longer terminal o4 = mock_map_op(o3) - topo = build_streaming_topology(o4, ExecutionOptions()) + topo = build_streaming_topology(o4, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) guard = OutputBackpressureGuard(topo, resource_manager) @@ -903,13 +834,14 @@ def test_no_unblock_backpressure_terminal_with_external_consumer( o2 = mock_map_op(o1) o3 = LimitOperator(1, o2, DataContext.get_current()) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) guard = OutputBackpressureGuard(topo, resource_manager) @@ -937,13 +869,14 @@ def test_unblock_backpressure_downstream_idle(self, restore_data_context): o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) guard = OutputBackpressureGuard(topo, resource_manager) o3.num_active_tasks = MagicMock(return_value=0) @@ -967,13 +900,14 @@ def test_unblock_backpressure_fallback_to_idle_detector(self, restore_data_conte o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) guard = OutputBackpressureGuard(topo, resource_manager) @@ -1008,13 +942,14 @@ def test_unblock_when_resource_allocator_disabled(self, restore_data_context): o2 = mock_map_op(o1) o3 = mock_map_op(o2) - topo = build_streaming_topology(o3, ExecutionOptions()) + topo = build_streaming_topology(o3, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(add_object_out_of_scope_callback=lambda *_: True), ) assert not resource_manager.op_resource_allocator_enabled() diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index d82ef7353088..199a4f74d42a 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -23,6 +23,7 @@ from ray.data._internal.execution.backpressure_policy.resource_budget_backpressure_policy import ( ResourceBudgetBackpressurePolicy, ) +from ray.data._internal.execution.block_ref_counter import BlockRefCounter from ray.data._internal.execution.execution_callback import ExecutionCallback from ray.data._internal.execution.interfaces import ( ExecutionOptions, @@ -69,6 +70,7 @@ from ray.data.block import BlockAccessor, BlockMetadataWithSchema, TaskExecWorkerStats from ray.data.context import EXECUTION_CALLBACKS_ENV_VAR, DataContext from ray.data.tests.conftest import * # noqa +from ray.data.tests.conftest import noop_counter def mock_resource_manager( @@ -124,7 +126,7 @@ def test_build_streaming_topology(verbose_progress, ray_start_regular_shared): DataContext.get_current(), ) topo = build_streaming_topology( - o3, ExecutionOptions(verbose_progress=verbose_progress) + o3, ExecutionOptions(verbose_progress=verbose_progress), noop_counter() ) assert len(topo) == 3, topo assert o1 in topo, topo @@ -154,7 +156,9 @@ def test_disallow_non_unique_operators(ray_start_regular_shared): DataContext.get_current(), ) with pytest.raises(ValueError): - build_streaming_topology(o4, ExecutionOptions(verbose_progress=True)) + build_streaming_topology( + o4, ExecutionOptions(verbose_progress=True), noop_counter() + ) def _make_disabled_guard() -> MagicMock: @@ -179,7 +183,9 @@ def test_process_completed_tasks(sleep_task_ref, ray_start_regular_shared): o1, DataContext.get_current(), ) - topo = build_streaming_topology(o2, ExecutionOptions(verbose_progress=True)) + topo = build_streaming_topology( + o2, ExecutionOptions(verbose_progress=True), noop_counter() + ) # Test processing output bundles. assert len(topo[o1].output_queue) == 0, topo @@ -228,7 +234,9 @@ def test_process_completed_tasks(sleep_task_ref, ray_start_regular_shared): o2, DataContext.get_current(), ) - topo = build_streaming_topology(o3, ExecutionOptions(verbose_progress=True)) + topo = build_streaming_topology( + o3, ExecutionOptions(verbose_progress=True), noop_counter() + ) o3.mark_execution_finished() o2.mark_execution_finished = MagicMock() @@ -253,7 +261,9 @@ def test_update_operator_states_drains_upstream(ray_start_regular_shared): o2, DataContext.get_current(), ) - topo = build_streaming_topology(o3, ExecutionOptions(verbose_progress=True)) + topo = build_streaming_topology( + o3, ExecutionOptions(verbose_progress=True), noop_counter() + ) # First, populate the upstream output queues by processing some tasks process_completed_tasks(topo, [], 0, _make_disabled_guard()) @@ -298,7 +308,7 @@ def test_get_eligible_operators_to_run(ray_start_regular_shared): DataContext.get_current(), name="O3", ) - topo = build_streaming_topology(o3, opts) + topo = build_streaming_topology(o3, opts, noop_counter()) resource_manager = mock_resource_manager( global_limits=ExecutionResources.for_limits(1, 1, 1), @@ -373,7 +383,7 @@ def test_backpressure_policy_tracking(ray_start_regular_shared): DataContext.get_current(), name="O2", ) - topo = build_streaming_topology(o2, opts) + topo = build_streaming_topology(o2, opts, noop_counter()) # Add input to o2's input queue so it becomes eligible topo[o1].output_queue.append(make_ref_bundle("dummy1")) @@ -451,7 +461,7 @@ def test_output_backpressure_policy_tracking(ray_start_regular_shared): DataContext.get_current(), name="O2", ) - topo = build_streaming_topology(o2, opts) + topo = build_streaming_topology(o2, opts, noop_counter()) # Create mock backpressure policies for output limiting with name property class LimitingPolicy: @@ -526,13 +536,14 @@ def test_process_completed_tasks_unblocks_when_non_resource_budget_policy_zeros_ DataContext.get_current(), name="O2", ) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(), DataContext.get_current(), + BlockRefCounter(), ) guard = OutputBackpressureGuard(topo, resource_manager) @@ -574,7 +585,7 @@ def test_summary_str_backpressure_policies(ray_start_regular_shared): DataContext.get_current(), name="O2", ) - topo = build_streaming_topology(o2, opts) + topo = build_streaming_topology(o2, opts, noop_counter()) resource_manager = mock_resource_manager() @@ -679,7 +690,7 @@ def _get_op_usage_mocked(op): # Case 1: Should pick the `o4` since it has throttling disabled _mock.return_value = [o1, o2, o3, o4] - topo = build_streaming_topology(o4, opts) + topo = build_streaming_topology(o4, opts, noop_counter()) selected = select_operator_to_run( topo, @@ -694,7 +705,7 @@ def _get_op_usage_mocked(op): # Case 2: Should pick the `o1` since it has lowest object store usage _mock.return_value = [o1, o2, o3] - topo = build_streaming_topology(o3, opts) + topo = build_streaming_topology(o3, opts, noop_counter()) selected = select_operator_to_run( topo, @@ -747,12 +758,13 @@ def test_debug_dump_topology(ray_start_regular_shared): o2, DataContext.get_current(), ) - topo = build_streaming_topology(o3, opt) + topo = build_streaming_topology(o3, opt, noop_counter()) resource_manager = ResourceManager( topo, ExecutionOptions(), MagicMock(return_value=ExecutionResources.zero()), DataContext.get_current(), + BlockRefCounter(), ) resource_manager.update_usages() # Just a sanity check to ensure it doesn't crash. @@ -866,7 +878,7 @@ def test_num_waiting_consumers_tracking(self): """num_waiting_consumers is incremented/decremented by get_output_blocking.""" o1 = InputDataBuffer(ray.data.DataContext.get_current(), []) o2 = LimitOperator(1, o1, ray.data.DataContext.get_current()) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) state = topo[o2] assert state.num_waiting_consumers == 0 @@ -900,7 +912,7 @@ def test_num_waiting_consumers_concurrent(self): For example, this happens for multiple streaming_split iterators.""" o1 = InputDataBuffer(ray.data.DataContext.get_current(), []) o2 = LimitOperator(1, o1, ray.data.DataContext.get_current()) - topo = build_streaming_topology(o2, ExecutionOptions()) + topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) state = topo[o2] def blocking_consumer(): @@ -1227,7 +1239,9 @@ def test_create_topology_metadata(): executor = StreamingExecutor(DataContext.get_current()) # Initialize the topology on the executor - executor._topology = build_streaming_topology(o3, ExecutionOptions()) + executor._topology = build_streaming_topology( + o3, ExecutionOptions(), noop_counter() + ) # Call the _dump_dag_structure method op_to_id = { @@ -1290,7 +1304,9 @@ def test_create_topology_metadata_with_sub_stages(): # Create the executor and set up topology executor = StreamingExecutor(DataContext.get_current()) - executor._topology = build_streaming_topology(o2, ExecutionOptions()) + executor._topology = build_streaming_topology( + o2, ExecutionOptions(), noop_counter() + ) # Get the DAG structure op_to_id = { @@ -1398,7 +1414,13 @@ def test_on_data_ready_single_output(self, ray_start_regular_shared): def verify_output(bundle): assert bundle.size_bytes() == pytest.approx(128 * MiB), bundle.size_bytes() - data_op_task = DataOpTask(0, streaming_gen, output_ready_callback=verify_output) + data_op_task = DataOpTask( + 0, + streaming_gen, + BlockRefCounter(), + "test_op", + output_ready_callback=verify_output, + ) bytes_read = 0 while not data_op_task.has_finished: @@ -1414,7 +1436,13 @@ def test_on_data_ready_multiple_outputs(self, ray_start_regular_shared): def verify_output(bundle): assert bundle.size_bytes() == pytest.approx(128 * MiB), bundle.size_bytes() - data_op_task = DataOpTask(0, streaming_gen, output_ready_callback=verify_output) + data_op_task = DataOpTask( + 0, + streaming_gen, + BlockRefCounter(), + "test_op", + output_ready_callback=verify_output, + ) bytes_read = 0 while not data_op_task.has_finished: @@ -1438,6 +1466,8 @@ def verify_exception(exc, task_exec_stats, task_exec_driver_stats): data_op_task = DataOpTask( 0, streaming_gen, + BlockRefCounter(), + "test_op", task_done_callback=verify_exception, ) @@ -1448,11 +1478,17 @@ def verify_exception(exc, task_exec_stats, task_exec_driver_stats): def test_operator_name_parameter(self, ray_start_regular_shared): streaming_gen = create_stub_streaming_gen(block_nbytes=[1]) - task = DataOpTask(0, streaming_gen, operator_name="MapBatches(fn)") + task = DataOpTask( + 0, + streaming_gen, + BlockRefCounter(), + "test_op", + operator_name="MapBatches(fn)", + ) assert task._operator_name == "MapBatches(fn)" streaming_gen2 = create_stub_streaming_gen(block_nbytes=[1]) - task_default = DataOpTask(1, streaming_gen2) + task_default = DataOpTask(1, streaming_gen2, BlockRefCounter(), "test_op") assert task_default._operator_name == "Unknown" @pytest.mark.parametrize( @@ -1490,7 +1526,11 @@ def remove_and_add_back_worker_node(_): cluster.wait_for_nodes() data_op_task = DataOpTask( - 0, streaming_gen, **{preempt_on: remove_and_add_back_worker_node} + 0, + streaming_gen, + BlockRefCounter(), + "test_op", + **{preempt_on: remove_and_add_back_worker_node}, ) # Run the task to completion. @@ -1520,7 +1560,7 @@ def test_on_data_ready_with_preemption_after_wait( # Create a streaming generator that produces a single 128 MiB output block. streaming_gen = create_stub_streaming_gen(block_nbytes=[128 * MiB]) - data_op_task = DataOpTask(0, streaming_gen) + data_op_task = DataOpTask(0, streaming_gen, BlockRefCounter(), "test_op") # Wait for the block to be ready, then remove the worker node. ray.wait([streaming_gen], fetch_local=False) @@ -1563,6 +1603,8 @@ def capture_done(exc, task_exec_stats, task_exec_driver_stats): data_op_task = DataOpTask( 0, streaming_gen, + BlockRefCounter(), + "test_op", task_done_callback=capture_done, ) diff --git a/python/ray/data/tests/unit/test_block_ref_counter.py b/python/ray/data/tests/unit/test_block_ref_counter.py new file mode 100644 index 000000000000..f1b01d575269 --- /dev/null +++ b/python/ray/data/tests/unit/test_block_ref_counter.py @@ -0,0 +1,156 @@ +import threading +from typing import Callable, Dict + +import pytest + +import ray +from ray.data._internal.execution.block_ref_counter import BlockRefCounter + + +def _ref(uid: int) -> "ray.ObjectRef": + """Real ObjectRef with a deterministic distinct 28-byte ID, no Ray cluster needed.""" + return ray.ObjectRef(uid.to_bytes(28, "big")) + + +class FakeAddObjectOutOfScopeCallback: + """Test double for CoreWorker.add_object_out_of_scope_callback. + + Records each registered callback keyed by the block's object-ID bytes so a + test can fire it explicitly. Set should_registration_fail=True to simulate an object + that is already out of scope at registration time. + """ + + def __init__(self, should_registration_fail: bool = False): + self._should_fail = should_registration_fail + self._callbacks: Dict[bytes, Callable[[bytes], None]] = {} + + def __call__( + self, object_ref: "ray.ObjectRef", callback: Callable[[bytes], None] + ) -> bool: + if not self._should_fail: + self._callbacks[object_ref.binary()] = callback + return not self._should_fail + + def fire(self, object_ref: "ray.ObjectRef") -> None: + id_binary = object_ref.binary() + self._callbacks[id_binary](id_binary) + + +class TestBlockRefCounterAccounting: + def test_single_block_produced_and_released(self): + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + ref = _ref(1) + + counter.on_block_produced(ref, 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 1 + + add_cb.fire(ref) + assert counter.get_object_store_memory_usage("op_a") == 0 + + def test_multiple_blocks_same_producer(self): + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + ref1, ref2 = _ref(1), _ref(2) + + counter.on_block_produced(ref1, 1, "op_a") + counter.on_block_produced(ref2, 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 2 + + add_cb.fire(ref1) + assert counter.get_object_store_memory_usage("op_a") == 1 + add_cb.fire(ref2) + assert counter.get_object_store_memory_usage("op_a") == 0 + + def test_multiple_producers_isolated(self): + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + ref1, ref2 = _ref(1), _ref(2) + + counter.on_block_produced(ref1, 1, "op_a") + counter.on_block_produced(ref2, 1, "op_b") + assert counter.get_object_store_memory_usage("op_a") == 1 + assert counter.get_object_store_memory_usage("op_b") == 1 + + add_cb.fire(ref1) + assert counter.get_object_store_memory_usage("op_a") == 0 + assert counter.get_object_store_memory_usage("op_b") == 1 + + def test_duplicate_registration_is_noop(self): + """on_block_produced is idempotent: a duplicate ref is silently ignored. + + This matters when an AllToAllOperator forwards an input ref unchanged; + the ref was already registered by the upstream producer.""" + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + ref = _ref(1) + + counter.on_block_produced(ref, 1, "op_a") + counter.on_block_produced(ref, 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 1 + + add_cb.fire(ref) + assert counter.get_object_store_memory_usage("op_a") == 0 + + def test_register_when_object_already_out_of_scope(self): + """If registration reports the object is already gone, the increment is + undone immediately so the producer nets to zero.""" + add_cb = FakeAddObjectOutOfScopeCallback(should_registration_fail=True) + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + + counter.on_block_produced(_ref(1), 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 0 + + +class TestBlockRefCounterClear: + def test_clear_resets_usage(self): + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + counter.on_block_produced(_ref(1), 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 1 + + counter.clear() + assert counter.get_object_store_memory_usage("op_a") == 0 + + def test_stale_callback_after_clear_is_noop(self): + """A stale callback firing after clear() must not touch accounting + recorded after the reset.""" + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + stale_ref = _ref(1) + counter.on_block_produced(stale_ref, 1, "op_a") + + counter.clear() + + counter.on_block_produced(_ref(2), 1, "op_a") + assert counter.get_object_store_memory_usage("op_a") == 1 + + add_cb.fire(stale_ref) + assert counter.get_object_store_memory_usage("op_a") == 1 + + +class TestBlockRefCounterThreadSafety: + def test_concurrent_callbacks_dont_corrupt_state(self): + """Many threads firing callbacks at once must not corrupt the count.""" + add_cb = FakeAddObjectOutOfScopeCallback() + counter = BlockRefCounter(add_object_out_of_scope_callback=add_cb) + producer_id = "op_concurrent" + n = 50 + refs = [_ref(i) for i in range(n)] + for ref in refs: + counter.on_block_produced(ref, 1, producer_id) + assert counter.get_object_store_memory_usage(producer_id) == n + + threads = [threading.Thread(target=add_cb.fire, args=(ref,)) for ref in refs] + for t in threads: + t.start() + for t in threads: + t.join() + + assert counter.get_object_store_memory_usage(producer_id) == 0 + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/unit/test_resource_manager.py b/python/ray/data/tests/unit/test_resource_manager.py index de9aa94400cc..ed7239309d3b 100644 --- a/python/ray/data/tests/unit/test_resource_manager.py +++ b/python/ray/data/tests/unit/test_resource_manager.py @@ -20,6 +20,8 @@ from ray.data.block import BlockMetadata from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa +from ray.data.tests.conftest import noop_counter +from ray.data.tests.test_resource_manager import StubBlockRefCounter def test_physical_operator_tracks_output_dependencies(): @@ -124,15 +126,17 @@ def test_does_not_double_count_usage_from_union(): input1 = PhysicalOperator("op1", [], DataContext.get_current()) input2 = PhysicalOperator("op2", [], DataContext.get_current()) union_op = UnionOperator(DataContext.get_current(), input1, input2) - topology = build_streaming_topology(union_op, ExecutionOptions()) + topology = build_streaming_topology(union_op, ExecutionOptions(), noop_counter()) # Create a resource manager. total_resources = ExecutionResources(cpu=0, object_store_memory=2) + counter = StubBlockRefCounter() resource_manager = ResourceManager( topology, ExecutionOptions(), lambda: total_resources, DataContext.get_current(), + counter, ) # Create two 1-byte `RefBundle`s. @@ -151,6 +155,9 @@ def test_does_not_double_count_usage_from_union(): # Add two 1-byte `RefBundle` to the union operator. topology[union_op].add_output(bundle1) topology[union_op].add_output(bundle2) + # Blocks are attributed to their original producer, not union_op. + counter.set_usage(input1.id, 1) + counter.set_usage(input2.id, 1) resource_manager.update_usages() # The total object store memory usage should be 2. If the resource manager double- @@ -186,15 +193,17 @@ def test_per_input_inqueue_attribution_for_union(): options = ExecutionOptions() options.preserve_order = True - topology = build_streaming_topology(union_op, options) + topology = build_streaming_topology(union_op, options, noop_counter()) # Create a resource manager. total_resources = ExecutionResources(cpu=0, object_store_memory=200) + counter = StubBlockRefCounter() resource_manager = ResourceManager( topology, options, lambda: total_resources, DataContext.get_current(), + counter, ) # Create two 10-byte RefBundles with distinct block refs (simulates real execution @@ -215,6 +224,8 @@ def test_per_input_inqueue_attribution_for_union(): # With preserve_order=True, _add_input_inner routes to _input_buffers[input_index]. union_op.add_input(bundle1, input_index=1) union_op.add_input(bundle2, input_index=1) + # Blocks in union's input buffer are attributed to their producer (input2). + counter.set_usage(input2.id, 20) resource_manager.update_usages() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 40171a59c1fc..a5e8793119ff 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -248,6 +248,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_bool all_namespaces) void AddLocalReference(const CObjectID &object_id) void RemoveLocalReference(const CObjectID &object_id) + c_bool AddObjectOutOfScopeOrFreedCallback( + const CObjectID &object_id, + void (*callback)(const CObjectID &, void *) nogil, + void *callback_context) + CRayStatus CheckObjectOwnedByUs(const CObjectID &object_id) const void PutObjectIntoPlasma(const CRayObject &object, const CObjectID &object_id) const CAddress &GetRpcAddress() const diff --git a/python/ray/tests/BUILD.bazel b/python/ray/tests/BUILD.bazel index 46bacf8f54fb..87ba35536ada 100644 --- a/python/ray/tests/BUILD.bazel +++ b/python/ray/tests/BUILD.bazel @@ -560,6 +560,7 @@ py_test_module_list( "test_multinode_failures_2.py", "test_node_death.py", "test_numba.py", + "test_object_out_of_scope_callback.py", "test_object_spilling_no_asan.py", "test_open_telemetry_metric_recorder.py", "test_placement_group_metrics.py", diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index f9480d6897f8..ff3f2e260bfa 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -268,7 +268,7 @@ def get_thread_count(self): ray.get(actor.get_thread_count.remote()) # Lowering these numbers in this assert should be celebrated, # increasing these numbers should be scrutinized - assert ray.get(actor.get_thread_count.remote()) in {20, 21, 22, 23} + assert ray.get(actor.get_thread_count.remote()) in {21, 22, 23, 24} # https://github.com/ray-project/ray/issues/7287 diff --git a/python/ray/tests/test_object_out_of_scope_callback.py b/python/ray/tests/test_object_out_of_scope_callback.py new file mode 100644 index 000000000000..081087b76c88 --- /dev/null +++ b/python/ray/tests/test_object_out_of_scope_callback.py @@ -0,0 +1,144 @@ +import threading +import time + +import pytest + +import ray +import ray._private.worker as worker_module + + +@pytest.fixture(scope="module") +def ray_instance(): + ray.init(num_cpus=1) + yield + ray.shutdown() + + +def _core_worker(): + return worker_module.global_worker.core_worker + + +class TestAddObjectOutOfScopeCallback: + """Integration tests against a live Ray cluster.""" + + def test_callback_fires_when_ref_dropped(self, ray_instance): + """Callback fires with the correct object ID bytes once the last reference is + dropped.""" + received_id = [] + done = threading.Event() + + ref = ray.put(42) + expected_binary = ref.binary() + registered = _core_worker().add_object_out_of_scope_callback( + ref, lambda id_bytes: (received_id.append(id_bytes), done.set()) + ) + assert registered, "Expected registration to succeed" + + del ref # drops the last Python reference — callback must now fire + assert done.wait(timeout=5), "Callback did not fire within 5 s" + assert received_id[0] == expected_binary + + def test_returns_false_for_already_out_of_scope(self, ray_instance): + """Returns False when the object is already out of scope; callback never fires.""" + ref = ray.put(99) + # Force the RC entry gone while keeping the Python ObjectRef alive. + ray.internal.free([ref]) + + # Poll with a no-op until the RC entry is gone (ray.internal.free is async). + deadline = time.monotonic() + 5 + registered = True + while time.monotonic() < deadline: + registered = _core_worker().add_object_out_of_scope_callback( + ref, lambda _: None + ) + if not registered: + break + time.sleep(0.01) + assert not registered, "Object never became out of scope within 5 s" + + # Now that we know the object is OOS, attempt registration with a real + # callback — must return False — then use a sentinel to drain the callback + # thread before asserting fired is still unset. + fired = threading.Event() + registered2 = _core_worker().add_object_out_of_scope_callback( + ref, lambda _: fired.set() + ) + assert not registered2, "Second registration on OOS object must return False" + + sentinel_done = threading.Event() + sentinel = ray.put("sentinel") + _core_worker().add_object_out_of_scope_callback( + sentinel, lambda _: sentinel_done.set() + ) + del sentinel + assert sentinel_done.wait(timeout=5) + assert not fired.is_set(), "Callback must not fire for an already-freed object" + + def test_ray_internal_free_triggers_callback(self, ray_instance): + """`ray.internal.free` should trigger the callback.""" + fired = threading.Event() + + ref = ray.put("free_me") + registered = _core_worker().add_object_out_of_scope_callback( + ref, lambda _: fired.set() + ) + assert registered + + ray.internal.free([ref]) + assert fired.wait(timeout=5), "Callback did not fire after ray.internal.free" + + def test_many_callbacks_all_fire(self, ray_instance): + """At scale, every registered callback fires exactly once after its ref is + dropped. Guards the dedicated callback thread against dropped or + duplicated notifications under load.""" + n = 2000 + remaining = [n] + lock = threading.Lock() + all_fired = threading.Event() + fire_counts = {} + + def on_freed(id_bytes): + with lock: + fire_counts[id_bytes] = fire_counts.get(id_bytes, 0) + 1 + remaining[0] -= 1 + if remaining[0] == 0: + all_fired.set() + + refs = [ray.put(i) for i in range(n)] + expected = {r.binary() for r in refs} + assert all( + _core_worker().add_object_out_of_scope_callback(ref, on_freed) + for ref in refs + ) + del refs # drop every last reference at once + assert all_fired.wait( + timeout=30 + ), f"Only {n - remaining[0]}/{n} callbacks fired within 30s" + with lock: + assert set(fire_counts) == expected, "Unexpected or missing object IDs" + assert all(c == 1 for c in fire_counts.values()), "A callback fired twice" + + def test_callback_exception_does_not_crash(self, ray_instance): + """A Python exception inside the callback must not propagate to C++.""" + second_fired = threading.Event() + + ref = ray.put("exc_test") + + def bad_cb(_): + raise RuntimeError("intentional error in callback") + + _core_worker().add_object_out_of_scope_callback(ref, bad_cb) + _core_worker().add_object_out_of_scope_callback( + ref, lambda _: second_fired.set() + ) + + del ref + assert second_fired.wait( + timeout=5 + ), "Exception in one callback must not prevent subsequent callbacks" + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main([__file__, "-v"])) diff --git a/release/benchmarks/object_store/callback_throughput.yaml b/release/benchmarks/object_store/callback_throughput.yaml new file mode 100644 index 000000000000..f7073a6bffe0 --- /dev/null +++ b/release/benchmarks/object_store/callback_throughput.yaml @@ -0,0 +1,14 @@ +cloud: {{env["ANYSCALE_CLOUD_NAME"]}} + +head_node: + instance_type: m6i.2xlarge + resources: + node: 1 + +worker_nodes: + - instance_type: m6i.2xlarge + min_nodes: 10 + max_nodes: 10 + market_type: ON_DEMAND + resources: + node: 1 diff --git a/release/benchmarks/object_store/test_callback_throughput.py b/release/benchmarks/object_store/test_callback_throughput.py new file mode 100644 index 000000000000..5a5d62ec48b2 --- /dev/null +++ b/release/benchmarks/object_store/test_callback_throughput.py @@ -0,0 +1,107 @@ +import gc +import json +import os +import threading +import time + +import numpy as np + +import ray +import ray._private.worker + +NUM_WORKERS = 10 +OBJECT_SIZE = 1024 * 1024 # 1 MiB, above the 100 KB inlining threshold + + +@ray.remote(num_cpus=1) +def produce_block(): + return np.zeros(OBJECT_SIZE, dtype=np.uint8) + + +@ray.remote(num_cpus=1) +def consume_block(block_ref): + return len(block_ref) + + +def test_callback_pipeline(num_blocks, timeout_s=60): + core_worker = ray._private.worker.global_worker.core_worker + + remaining = [num_blocks] + lock = threading.Lock() + done = threading.Event() + + def on_freed(_id_bytes): + with lock: + remaining[0] -= 1 + if remaining[0] == 0: + done.set() + + start = time.perf_counter() + + # Produce blocks (models MapOperator submitting tasks). + refs = [ + produce_block.options(scheduling_strategy="SPREAD").remote() + for _ in range(num_blocks) + ] + ray.wait(refs, num_returns=len(refs)) + + # Register callbacks (models BlockRefCounter.on_block_produced). + assert all( + core_worker.add_object_out_of_scope_callback(ref, on_freed) for ref in refs + ) + + # Pass to consumers (models downstream operator receiving blocks). + consumer_futures = [consume_block.remote(ref) for ref in refs] + ray.get(consumer_futures) + + # Drop all references (models blocks going out of scope after consumption). + del refs + gc.collect() + + if not done.wait(timeout=timeout_s): + fired = num_blocks - remaining[0] + raise TimeoutError( + f"Only {fired}/{num_blocks} callbacks fired within {timeout_s}s" + ) + + total_s = time.perf_counter() - start + print(f" {num_blocks} blocks: {total_s:.3f}s") + return total_s + + +ray.init(address="auto") + +# Warm up gRPC connections and worker pools. +ray.get( + [ + produce_block.options(scheduling_strategy="SPREAD").remote() + for _ in range(NUM_WORKERS) + ] +) + +time_100 = test_callback_pipeline(100) +time_1k = test_callback_pipeline(1000) + +print("\nSummary:") +print(f" 100 blocks: {time_100:.3f}s") +print(f" 1k blocks: {time_1k:.3f}s") + +if "TEST_OUTPUT_JSON" in os.environ: + with open(os.environ["TEST_OUTPUT_JSON"], "w") as out_file: + results = { + "time_100": time_100, + "time_1k": time_1k, + "perf_metrics": [ + { + "perf_metric_name": "callback_pipeline_100_blocks_s", + "perf_metric_value": time_100, + "perf_metric_type": "LATENCY", + }, + { + "perf_metric_name": "callback_pipeline_1k_blocks_s", + "perf_metric_value": time_1k, + "perf_metric_type": "LATENCY", + }, + ], + } + json.dump(results, out_file) diff --git a/release/nightly_tests/dataset/callback_latency_benchmark.py b/release/nightly_tests/dataset/callback_latency_benchmark.py new file mode 100644 index 000000000000..109bf58586c9 --- /dev/null +++ b/release/nightly_tests/dataset/callback_latency_benchmark.py @@ -0,0 +1,101 @@ +import argparse +import gc +import time + +import ray + +from benchmark import Benchmark +from ray.data._internal.execution.block_ref_counter import BlockRefCounter + +_SETTLE_TIMEOUT_S = 30.0 + + +@ray.remote(num_cpus=1) +def _hold_block(block_ref, sleep_s: float) -> None: + """Hold block_ref as a task argument for sleep_s seconds, then return.""" + time.sleep(sleep_s) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="BlockRefCounter convergence latency benchmark" + ) + parser.add_argument( + "--num-tasks", + type=int, + default=100, + help="Number of concurrent tasks / blocks in the burst. " + "Latency scales with this value (ReferenceCounter mutex serializes callbacks); " + "increase together with cluster size for stress testing.", + ) + parser.add_argument( + "--task-sleep-s", + type=float, + default=0.1, + help="How long each task holds its block before completing.", + ) + return parser.parse_args() + + +def run_burst_object_free(args: argparse.Namespace) -> dict: + n = args.num_tasks + producer_id = "burst_producer" + counter = BlockRefCounter() + + # Object size is irrelevant: we measure callback timing, not memory accuracy. + refs = [ray.put(b"block") for _ in range(n)] + for ref in refs: + counter.on_block_produced(ref, 1, producer_id) + + # Submit tasks that each hold one block, then drop Python refs. + # SPREAD distributes tasks across nodes to simulate a multi-node burst. + task_futures = [ + _hold_block.options(scheduling_strategy="SPREAD").remote(ref, args.task_sleep_s) + for ref in refs + ] + # Drop Python refs so each block is kept alive solely by its task argument. + # The callback can only fire once the task completes AND this ref is gone. + del refs + gc.collect() + + # T0 is stamped after all tasks finish so settle_time_s measures only the + # callback firing lag, not task execution time. + ray.get(task_futures) + t0 = time.perf_counter() + gc.collect() + + negative_events = 0 + converged = False + while time.perf_counter() - t0 < _SETTLE_TIMEOUT_S: + gc.collect() + usage = counter.get_object_store_memory_usage(producer_id) + if usage < 0: + negative_events += 1 + if usage <= 0: + converged = True + break + time.sleep(0.05) + + settle_time_s = time.perf_counter() - t0 + + assert negative_events == 0, f"Counter went negative {negative_events} times" + assert converged, ( + f"Counter did not reach 0 within {_SETTLE_TIMEOUT_S}s; " + f"remaining: {counter.get_object_store_memory_usage(producer_id)} bytes" + ) + + return { + "callback_settle_time_s": round(settle_time_s, 4), + "num_tasks": n, + "counter_negative_events": negative_events, + } + + +def main(args: argparse.Namespace): + benchmark = Benchmark() + benchmark.run_fn("burst-object-free", run_burst_object_free, args) + benchmark.write_result() + + +if __name__ == "__main__": + main(parse_args()) diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index 88d84781f895..19cbbb6dbcbf 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -750,6 +750,16 @@ run: script: python backpressure_benchmark.py --case training-prefetch --num-trainers 1 +- name: block_ref_counter_convergence + python: "3.10" + cluster: + anyscale_sdk_2026: true + cluster_compute: fixed_size_100_cpu_compute.yaml + + run: + timeout: 300 + script: python callback_latency_benchmark.py + ######################## # Sort and shuffle tests diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 21e6b22a2696..849bea7023a4 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3219,6 +3219,30 @@ cluster: cluster_compute: object_store_gce.yaml +- name: callback_throughput + python: "3.10" + group: core-scalability-test + working_dir: benchmarks + + frequency: nightly + team: core + env: aws_perf + cluster: + anyscale_sdk_2026: true + byod: + runtime_env: + - LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so + cluster_compute: object_store/callback_throughput.yaml + + run: + timeout: 600 + script: python object_store/test_callback_throughput.py + wait_for_nodes: + num_nodes: 11 + + variations: + - __suffix__: aws + - name: small_objects python: "3.10" group: core-scalability-test diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0b2a713f52ef..e20ae7c1da56 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -291,6 +291,7 @@ CoreWorker::CoreWorker( CoreWorkerOptions options, std::unique_ptr worker_context, instrumented_io_context &io_service, + instrumented_io_context &object_freed_callback_service, std::shared_ptr core_worker_client_pool, std::shared_ptr raylet_client_pool, std::shared_ptr periodical_runner, @@ -300,6 +301,7 @@ CoreWorker::CoreWorker( std::shared_ptr raylet_ipc_client, std::shared_ptr local_raylet_rpc_client, boost::thread &io_thread, + boost::thread &object_freed_callback_thread, std::shared_ptr reference_counter, std::shared_ptr memory_store, std::shared_ptr plasma_store_provider, @@ -327,6 +329,7 @@ CoreWorker::CoreWorker( : nullptr), worker_context_(std::move(worker_context)), io_service_(io_service), + object_freed_callback_service_(object_freed_callback_service), core_worker_client_pool_(std::move(core_worker_client_pool)), raylet_client_pool_(std::move(raylet_client_pool)), periodical_runner_(std::move(periodical_runner)), @@ -336,6 +339,7 @@ CoreWorker::CoreWorker( raylet_ipc_client_(std::move(raylet_ipc_client)), local_raylet_rpc_client_(std::move(local_raylet_rpc_client)), io_thread_(io_thread), + object_freed_callback_thread_(object_freed_callback_thread), reference_counter_(std::move(reference_counter)), memory_store_(std::move(memory_store)), plasma_store_provider_(std::move(plasma_store_provider)), @@ -2530,6 +2534,38 @@ bool CoreWorker::IsTaskCanceled(const TaskID &task_id) const { return canceled_tasks_.find(task_id) != canceled_tasks_.end(); } +bool CoreWorker::AddObjectOutOfScopeOrFreedCallback( + const ObjectID &object_id, const std::function &callback) { + auto wrapped = [&object_freed_callback_service = object_freed_callback_service_, + callback](const ObjectID &id) { + object_freed_callback_service.post([callback, id]() { callback(id); }, + "CoreWorker.ObjFreedCb"); + }; + return reference_counter_->AddObjectOutOfScopeOrFreedCallback(object_id, wrapped); +} + +bool CoreWorker::AddObjectOutOfScopeOrFreedCallback(const ObjectID &object_id, + void (*callback)(const ObjectID &, + void *), + void *callback_context) { + RAY_CHECK(callback != nullptr) << "callback must not be null"; + return AddObjectOutOfScopeOrFreedCallback( + object_id, [callback, callback_context](const ObjectID &id) { + callback(id, callback_context); + }); +} + +Status CoreWorker::CheckObjectOwnedByUs(const ObjectID &object_id) const { + if (reference_counter_->OwnedByUs(object_id)) { + return Status::OK(); + } + return Status::InvalidArgument(absl::StrFormat( + "Cannot register an out-of-scope/freed callback for object %s: it is not " + "owned by this worker (it may be owned by another worker, or have no " + "ownership record). These callbacks can only be registered by the owner.", + object_id.Hex())); +} + bool CoreWorker::ShouldInterruptTaskForCancellation() const { if (worker_context_->GetCurrentJobID().IsNil()) { return false; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index cc664e2500ae..5b63b17b505f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -174,6 +174,7 @@ class CoreWorker : public std::enable_shared_from_this { CoreWorker(CoreWorkerOptions options, std::unique_ptr worker_context, instrumented_io_context &io_service, + instrumented_io_context &object_freed_callback_service, std::shared_ptr core_worker_client_pool, std::shared_ptr raylet_client_pool, std::shared_ptr periodical_runner, @@ -183,6 +184,7 @@ class CoreWorker : public std::enable_shared_from_this { std::shared_ptr raylet_ipc_client, std::shared_ptr local_raylet_rpc_client, boost::thread &io_thread, + boost::thread &object_freed_callback_thread, std::shared_ptr reference_counter, std::shared_ptr memory_store, std::shared_ptr plasma_store_provider, @@ -401,6 +403,45 @@ class CoreWorker : public std::enable_shared_from_this { memory_store_->Delete(deleted); } + /// Register a callback to fire when an object goes out of scope or is freed. + /// Can only be called for objects owned by this worker. The callback is posted + /// to the dedicated object_freed_callback_service_ thread so it never blocks + /// the main IO thread. + /// + /// \param[in] object_id The owned object to watch. + /// \param[in] callback Invoked with the object_id when the object goes out of scope. + /// \return true if registered; false if the object is already out of scope or freed + /// (callback will never fire). + bool AddObjectOutOfScopeOrFreedCallback( + const ObjectID &object_id, const std::function &callback); + + /// C function-pointer overload of AddObjectOutOfScopeOrFreedCallback for use + /// from Cython. Can only be called for objects owned by this worker. + /// + /// \param[in] object_id The owned object to watch. + /// \param[in] callback Function to invoke when the object goes out of scope. Called + /// with (object_id, callback_context). + /// \param[in] callback_context Opaque pointer forwarded unchanged to `callback`. + /// In the Cython overload, this is a pointer to a Python `bytes` object + /// containing the object ID binary, used as the key into the callback + /// registry. + /// \return true if registered; false if the object is already out of scope or freed + /// (callback will never fire). + bool AddObjectOutOfScopeOrFreedCallback(const ObjectID &object_id, + void (*callback)(const ObjectID &, void *), + void *callback_context); + + /// Validate that the given object is owned by this worker. Used to gate + /// owner-only operations (e.g. registering an out-of-scope/freed callback) + /// so the error is constructed in C++ and propagated through the standard + /// Status path rather than re-implemented at each binding. + /// + /// \param[in] object_id The object to check. + /// \return Status::OK if this worker is the owner of the object; + /// Status::InvalidArgument otherwise (the rejected case in practice is + /// a borrowed object owned by another worker). + Status CheckObjectOwnedByUs(const ObjectID &object_id) const; + int GetMemoryStoreSize() { return memory_store_->Size(); } /// Returns a map of all ObjectIDs currently in scope with a pair of their @@ -1798,6 +1839,9 @@ class CoreWorker : public std::enable_shared_from_this { /// Event loop where the IO events are handled. e.g. async GCS operations. instrumented_io_context &io_service_; + /// Dedicated event loop for object-freed callbacks, keeping them off io_service_. + instrumented_io_context &object_freed_callback_service_; + /// Shared core worker client pool. std::shared_ptr core_worker_client_pool_; @@ -1824,6 +1868,9 @@ class CoreWorker : public std::enable_shared_from_this { // Thread that runs a boost::asio service to process IO events. boost::thread &io_thread_; + /// Dedicated thread for user-registered object-freed callbacks. + boost::thread &object_freed_callback_thread_; + // Keeps track of object ID reference counts. std::shared_ptr reference_counter_; diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index a662191c6da0..31e8aeeb3426 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -190,6 +190,26 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( RAY_LOG(INFO) << "Core worker main io service stopped."; }); + // Start the dedicated callback thread for user-registered object-freed callbacks. + // Python code invoked from callbacks may call into numpy or other libraries that + // need a large stack; give it the same 16 MB as the IO thread on Mac. + boost::thread::attributes obj_freed_cb_thread_attrs; +#if defined(__APPLE__) + obj_freed_cb_thread_attrs.set_stack_size(16777216); +#endif + object_freed_callback_thread_ = boost::thread(obj_freed_cb_thread_attrs, [this]() { +#ifndef _WIN32 + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + pthread_sigmask(SIG_BLOCK, &mask, nullptr); +#endif + SetThreadName("worker.user_obj_freed_callback"); + object_freed_callback_service_.run(); + RAY_LOG(INFO) << "Object-freed callback service stopped."; + }); + if (options.worker_type == WorkerType::DRIVER && !options.serialized_job_config.empty()) { // Driver populates the job config via initialization. @@ -690,6 +710,7 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( std::make_shared(std::move(options), std::move(worker_context), io_service_, + object_freed_callback_service_, std::move(core_worker_client_pool), std::move(raylet_client_pool), std::move(periodical_runner), @@ -699,6 +720,7 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( std::move(raylet_ipc_client), std::move(local_raylet_rpc_client), io_thread_, + object_freed_callback_thread_, std::move(reference_counter), std::move(memory_store), std::move(plasma_store_provider), @@ -731,6 +753,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) ? ComputeDriverIdFromJob(options_.job_id) : options_.worker_id), io_work_(io_service_.get_executor()), + object_freed_callback_service_work_(object_freed_callback_service_.get_executor()), client_call_manager_(std::make_unique( io_service_, /*record_stats=*/false, options.node_ip_address)), task_execution_service_work_(task_execution_service_.get_executor()), diff --git a/src/ray/core_worker/core_worker_process.h b/src/ray/core_worker/core_worker_process.h index 6412129397e2..df4834032ddf 100644 --- a/src/ray/core_worker/core_worker_process.h +++ b/src/ray/core_worker/core_worker_process.h @@ -166,6 +166,15 @@ class CoreWorkerProcessImpl { /// Keeps the io_service_ alive. boost::asio::executor_work_guard io_work_; + /// Dedicated io_context for out-of-scope callbacks registered by users (e.g. Python). + instrumented_io_context object_freed_callback_service_{ + /*enable_lag_probe=*/false, + /*running_on_single_thread=*/true}; + + /// Keeps object_freed_callback_service_ alive until explicitly stopped. + boost::asio::executor_work_guard + object_freed_callback_service_work_; + /// Shared client call manager across all gRPC clients in the core worker process. /// This is used by the CoreWorker and the MetricsAgentClient. std::unique_ptr client_call_manager_; @@ -183,6 +192,9 @@ class CoreWorkerProcessImpl { // Thread that runs a boost::asio service to process IO events. boost::thread io_thread_; + /// Thread that drains object_freed_callback_service_. + boost::thread object_freed_callback_thread_; + /// The core worker instance of this worker process. MutexProtected> core_worker_; diff --git a/src/ray/core_worker/core_worker_shutdown_executor.cc b/src/ray/core_worker/core_worker_shutdown_executor.cc index 2e52d851ffdb..5e6e28ee37d9 100644 --- a/src/ray/core_worker/core_worker_shutdown_executor.cc +++ b/src/ray/core_worker/core_worker_shutdown_executor.cc @@ -94,6 +94,16 @@ void CoreWorkerShutdownExecutor::ExecuteGracefulShutdown( } } + // Post stop() as a handler so it runs after all pending Py_DECREF callbacks + // have executed — avoids leaking Python refcounts if callbacks are in flight. + core_worker->object_freed_callback_service_.post( + [&svc = core_worker->object_freed_callback_service_]() { svc.stop(); }, + "CoreWorker.StopCallbackService"); + RAY_LOG(INFO) << "Waiting for joining the object-freed callback thread."; + if (core_worker->object_freed_callback_thread_.joinable()) { + core_worker->object_freed_callback_thread_.join(); + } + core_worker->core_worker_server_->Shutdown(); // GCS client is safe to disconnect now that io_service has stopped. diff --git a/src/ray/core_worker/tests/core_worker_test.cc b/src/ray/core_worker/tests/core_worker_test.cc index 71bd59f1445e..66529a31c39d 100644 --- a/src/ray/core_worker/tests/core_worker_test.cc +++ b/src/ray/core_worker/tests/core_worker_test.cc @@ -65,7 +65,9 @@ class CoreWorkerTest : public ::testing::Test { public: CoreWorkerTest() : io_work_(io_service_.get_executor()), - task_execution_service_work_(task_execution_service_.get_executor()) { + task_execution_service_work_(task_execution_service_.get_executor()), + object_freed_callback_service_work_( + object_freed_callback_service_.get_executor()) { CoreWorkerOptions options; options.worker_type = WorkerType::WORKER; options.language = Language::PYTHON; @@ -263,6 +265,7 @@ class CoreWorkerTest : public ::testing::Test { core_worker_ = std::make_shared(std::move(options), std::move(worker_context), io_service_, + object_freed_callback_service_, std::move(core_worker_client_pool), std::move(raylet_client_pool), std::move(periodical_runner), @@ -272,6 +275,7 @@ class CoreWorkerTest : public ::testing::Test { std::move(fake_raylet_ipc_client), std::move(fake_local_raylet_rpc_client), io_thread_, + object_freed_callback_thread_, reference_counter_, memory_store_, nullptr, // plasma_store_provider_ @@ -298,11 +302,19 @@ class CoreWorkerTest : public ::testing::Test { FakeClock clock_; instrumented_io_context io_service_; instrumented_io_context task_execution_service_; + instrumented_io_context object_freed_callback_service_; boost::asio::executor_work_guard io_work_; boost::asio::executor_work_guard task_execution_service_work_; + boost::asio::executor_work_guard + object_freed_callback_service_work_; boost::thread io_thread_; + boost::thread object_freed_callback_thread_; + + /// Flush all pending object-freed callbacks. Call this in tests after an action + /// that should trigger a user-registered out-of-scope callback. + void FlushObjectFreedCallbacks() { object_freed_callback_service_.poll(); } rpc::Address rpc_address_; std::unique_ptr client_call_manager_; @@ -1318,5 +1330,185 @@ INSTANTIATE_TEST_SUITE_P(ActorRefDeletedForRegisteringActor, HandleWaitForActorRefDeletedWhileRegisteringRetriesTest, ::testing::Values(true, false)); +// Callback fires after the last local reference is dropped, and +// FlushObjectFreedCallbacks drains the pending work. +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_FiresAfterRefDrop) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + + bool fired = false; + ObjectID received_id; + bool registered = core_worker_->AddObjectOutOfScopeOrFreedCallback( + object_id, [&fired, &received_id](const ObjectID &id) { + fired = true; + received_id = id; + }); + ASSERT_TRUE(registered); + ASSERT_FALSE(fired); + + reference_counter_->RemoveLocalReference(object_id, nullptr); + // Callback is posted to the dedicated service; flush it synchronously. + FlushObjectFreedCallbacks(); + ASSERT_TRUE(fired); + EXPECT_EQ(received_id, object_id); +} + +// Returns false when the object is already out of scope; callback never fires. +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_ReturnsFalseWhenAlreadyOutOfScope) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + // Add and immediately remove the reference so it goes out of scope. + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + reference_counter_->RemoveLocalReference(object_id, nullptr); + + bool fired = false; + bool registered = core_worker_->AddObjectOutOfScopeOrFreedCallback( + object_id, [&fired](const ObjectID &) { fired = true; }); + ASSERT_FALSE(registered); + FlushObjectFreedCallbacks(); + ASSERT_FALSE(fired); +} + +// The callback must run on the dedicated object_freed_callback_service_ thread, +// not on the IO thread or the test thread. +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_RunsOnDedicatedThread) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + + std::promise thread_id_promise; + bool registered = core_worker_->AddObjectOutOfScopeOrFreedCallback( + object_id, [&thread_id_promise](const ObjectID &) { + thread_id_promise.set_value(boost::this_thread::get_id()); + }); + ASSERT_TRUE(registered); + + // RemoveLocalReference fires OnObjectOutOfScopeOrFreed inline (test thread), which + // calls the wrapped lambda that posts the real callback to + // object_freed_callback_service_. + reference_counter_->RemoveLocalReference(object_id, nullptr); + + // Start the dedicated thread so the posted work can run. + object_freed_callback_thread_ = + boost::thread([this]() { object_freed_callback_service_.run(); }); + + auto tid = thread_id_promise.get_future().get(); + auto expected_tid = object_freed_callback_thread_.get_id(); + + object_freed_callback_service_.stop(); + if (object_freed_callback_thread_.joinable()) { + object_freed_callback_thread_.join(); + } + + EXPECT_EQ(tid, expected_tid) << "Callback must run on object_freed_callback_thread_"; + EXPECT_NE(tid, boost::this_thread::get_id()) + << "Callback must not run on the test thread"; +} + +// The C function-pointer overload (used by Cython) routes through the same +// dedicated thread and delivers the correct object_id + user_data. +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_CFunctionPointerOverload) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + + struct Result { + ObjectID id; + bool fired = false; + } result; + + auto c_callback = [](const ObjectID &id, void *data) { + auto *r = static_cast(data); + r->id = id; + r->fired = true; + }; + + bool registered = + core_worker_->AddObjectOutOfScopeOrFreedCallback(object_id, c_callback, &result); + ASSERT_TRUE(registered); + + reference_counter_->RemoveLocalReference(object_id, nullptr); + FlushObjectFreedCallbacks(); + + ASSERT_TRUE(result.fired); + EXPECT_EQ(result.id, object_id); +} + +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_MultipleCallbacksAllFire) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + + int fire_count = 0; + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(core_worker_->AddObjectOutOfScopeOrFreedCallback( + object_id, [&fire_count](const ObjectID &) { ++fire_count; })); + } + + reference_counter_->RemoveLocalReference(object_id, nullptr); + FlushObjectFreedCallbacks(); + + EXPECT_EQ(fire_count, 3); +} + +TEST_F(CoreWorkerTest, AddObjectOutOfScopeCallback_FiresExactlyOnce) { + auto object_id = ObjectID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(core_worker_->GetWorkerID().Binary()); + reference_counter_->AddOwnedObject(object_id, + {}, + owner_address, + "", + 0, + LineageReconstructionEligibility::INELIGIBLE_PUT, + /*add_local_ref=*/true); + + int fire_count = 0; + ASSERT_TRUE(core_worker_->AddObjectOutOfScopeOrFreedCallback( + object_id, [&fire_count](const ObjectID &) { ++fire_count; })); + + reference_counter_->RemoveLocalReference(object_id, nullptr); + FlushObjectFreedCallbacks(); + FlushObjectFreedCallbacks(); // second flush must not re-fire + + EXPECT_EQ(fire_count, 1); +} + } // namespace core } // namespace ray