[Data] [Core] [4/n] Switch ResourceManager OSM Estimation to use BlockRefCounter#64192
[Data] [Core] [4/n] Switch ResourceManager OSM Estimation to use BlockRefCounter#64192rayhhome wants to merge 62 commits into
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…allback-core-wiring
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…allback-core-wiring
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…e-wiring Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized BlockRefCounter to track object-store memory usage per operator using Ray Core out-of-scope callbacks, replacing the previous queue-size-based estimation. This counter is integrated across physical operators, tasks, the resource manager, and the streaming executor, with corresponding updates to the test suite. The review feedback suggests several improvements to make the implementation more robust: preventing double-counting and memory leaks by ignoring duplicate block registrations, defensively ensuring memory usage values do not drop below zero, initializing the operator's block reference counter to a default instance in __init__ to avoid AttributeErrors during testing, and adding a unit test to verify duplicate registration handling.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Pull request overview
This PR switches Ray Data streaming execution’s object-store memory estimation from per-operator queue/metric-based estimates to centralized, callback-driven accounting via a new BlockRefCounter. This affects scheduler backpressure and spill/OOM decisions by changing how live block bytes are attributed and summed across the pipeline.
Changes:
- Introduce
BlockRefCounter(Ray Core out-of-scope callbacks) and wireResourceManager._estimate_object_store_memory_usage()to read per-producer bytes from it. - Thread an executor-wide
BlockRefCounterthroughPhysicalOperator.start()and intoDataOpTask, registering produced blocks as they are yielded. - Update and extend tests to validate callback-based accounting, including a new
test_block_ref_counter.pytarget and updated ResourceManager/streaming-executor tests.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| python/ray/data/_internal/execution/block_ref_counter.py | Adds the centralized ref-counting byte tracker built on Ray Core callbacks. |
| python/ray/data/_internal/execution/resource_manager.py | Switches object store memory estimation to use BlockRefCounter. |
| python/ray/data/_internal/execution/streaming_executor.py | Starts all operators with the shared counter and clears it on shutdown. |
| python/ray/data/_internal/execution/streaming_executor_state.py | Removes operator start() from state setup (moved to executor). |
| python/ray/data/_internal/execution/interfaces/physical_operator.py | Extends PhysicalOperator.start() signature; extends DataOpTask to register produced blocks. |
| python/ray/data/_internal/execution/operators/map_operator.py | Passes counter/producer into DataOpTask; updates start() signature. |
| python/ray/data/_internal/execution/operators/hash_shuffle.py | Updates start() signature; passes counter/producer into finalizing tasks. |
| python/ray/data/_internal/execution/operators/base_physical_operator.py | Avoids double-counting forwarded refs by only registering genuinely new output refs. |
| python/ray/data/_internal/execution/operators/input_data_buffer.py | Updates start() signature to accept the shared counter. |
| python/ray/data/_internal/execution/operators/union_operator.py | Updates start() signature to accept the shared counter. |
| python/ray/data/_internal/execution/operators/output_splitter.py | Updates start() signature to accept the shared counter. |
| python/ray/data/_internal/execution/operators/actor_pool_map_operator.py | Updates start() signature to accept the shared counter. |
| python/ray/data/tests/test_block_ref_counter.py | Adds unit + lifecycle/integration coverage for callback behavior and races. |
| python/ray/data/BUILD.bazel | Adds Bazel py_test target for test_block_ref_counter. |
| python/ray/data/tests/test_resource_manager.py | Updates ResourceManager tests to validate counter-based accounting and union attribution. |
| python/ray/data/tests/unit/test_resource_manager.py | Adjusts unit tests to seed counter attribution directly for union cases. |
| python/ray/data/tests/test_streaming_executor.py | Updates DataOpTask construction in tests with a helper that supplies counter/producer. |
| python/ray/data/tests/test_operators.py | Ensures operators are started in tests where required by new lifecycle. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
45e0747 to
eb16efe
Compare
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
00514ec to
1936a69
Compare
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
1936a69 to
e21d710
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit e21d710. Configure here.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
… docstring Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…ounter Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
e21d710 to
2edfb95
Compare

Description
Switches
ResourceManager._estimate_object_store_memory_usageto read fromBlockRefCounter, replacing the manualop.estimate_object_store_usage(state)path. This is the only change visible to the scheduler; over- or under-counts here cascade into backpressure and OOM/spill decisions.Implementation
resource_manager.py._estimate_object_store_memory_usagereads_block_ref_counter.get_object_store_memory_usage(op.id)for output bytes instead of callingop.estimate_object_store_usage(state).Elimination of explicit external-consumer-bytes tracking
Previously, handing a block to a Train worker required an explicit
set_external_consumer_bytescall from the Train-side iterator to keep the memory budget accurate. With callback-based tracking this is no longer needed: the Train worker holds anObjectRef, so the bytes remain attributed until the worker drops the ref and the callback fires.Note:
DownstreamCapacityBackpressurePolicystill reads_external_consumer_bytesfor its own flow-control logic. Removing that dependency is a follow-up.Tests
test_resource_manager.pyandunit/test_resource_manager.py: updated to validate_estimate_object_store_memory_usageagainst callback-based accounting.Related issues
Depends on #64191 (operator wiring).
Related to #63601 (prototype), #63074 (previous manual
BlockRefCounter).