[Data] [Core] [1/n] Add dedicated callback thread and Cython API for out-of-scope objects#64011
[Data] [Core] [1/n] Add dedicated callback thread and Cython API for out-of-scope objects#64011rayhhome wants to merge 40 commits into
Conversation
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to register user-defined callbacks that fire when a Ray object goes out of scope or is freed, utilizing a dedicated background thread to avoid blocking the main IO thread. The feedback highlights critical safety improvements, including wrapping the Cython callback instantiation in a try-except block that catches BaseException to prevent process crashes, validating that the Python callback is callable, avoiding a potential use-after-free by capturing the service pointer instead of this in the asynchronous lambda, and adding a defensive nullptr check for the C function pointer.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Pull request overview
This PR adds a dedicated “object-freed/out-of-scope callback” execution path in Ray Core (C++ + Cython) so Python can register callbacks that fire when an ObjectRef becomes out-of-scope or is explicitly freed, without running Python code while the reference-counter mutex is held.
Changes:
- Introduces a dedicated
instrumented_io_context+ thread (object_freed_callback_service_/object_freed_callback_thread_) for dispatching object-freed callbacks off the main IO thread. - Adds
CoreWorker::AddObjectOutOfScopeOrFreedCallbackoverloads (std::function + C function-pointer) and exposesCoreWorker.add_object_out_of_scope_callback()in Python via Cython. - Adds C++ and Python test coverage for callback behavior, registration semantics, and thread identity.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ray/core_worker/tests/core_worker_test.cc | Adds unit tests and test plumbing for dedicated callback service/thread behavior. |
| src/ray/core_worker/core_worker.h | Extends CoreWorker constructor + API surface to accept dedicated callback service/thread and expose registration methods. |
| src/ray/core_worker/core_worker.cc | Implements callback registration by posting user callbacks onto the dedicated callback service. |
| src/ray/core_worker/core_worker_shutdown_executor.cc | Stops and joins the dedicated callback thread during graceful shutdown. |
| src/ray/core_worker/core_worker_process.h | Adds process-level dedicated callback io_context, work guard, and thread members. |
| src/ray/core_worker/core_worker_process.cc | Starts the dedicated callback thread and wires the service/thread into CoreWorker construction. |
| python/ray/tests/test_object_out_of_scope_callback.py | Adds live-cluster integration tests for the Python callback API. |
| python/ray/includes/libcoreworker.pxd | Exposes the new C function-pointer overload to Cython. |
| python/ray/_raylet.pyx | Adds the Cython trampoline and Python-facing add_object_out_of_scope_callback method. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…allback-core-wiring
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…allback-core-wiring
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
rayhhome
left a comment
There was a problem hiding this comment.
Addressed the comments! Below is for your confirmation questions:
- If a callback hangs, the dedicated thread pulling from
boost::asio::io_serviceblocks. All subsequent callbacks queue up on the C++ side and never dequeue. However, if we factor in Ray Data side changes,_on_object_freedis always O(1) dict operations under a lock and can theoretically never block. But it's worth documenting as a contract for future callers.
- When callback errors,
_invoke_object_out_of_scope_callbackwraps the call in except BaseException (line 2867), logs withlogger.exception, and the finally block still runs Py_DECREF. Errors are surfaced in logs without crashing so the thread continues processing subsequent callbacks.
-
Callbacks always gets triggered on the owner.
add_object_out_of_scope_callbackraisesValueErrorif called on a non-owner, andOnObjectOutOfScopeOrFreedis invoked insideDeleteReferenceInternalwhich only runs on the owning worker. Callbacks from different objects don't run concurrently, but they do run concurrently with the main Python thread. Not too sure about the move semantics aspect though. -
service_.post()is FIFO. Multiple callbacks registered for the same object would fire in the order they're registered. Across objects I think the order is dependent on network.
The scale testing on callback latency will be covered by a new release test on the Ray Data side PR. Or do you think we should cover it here directly on Ray Core side?
|
@justinvyu We do need to worry about how ownership transfer is handled during move and how it interacts with the callback. Specifically, if the owner gets moved off this node, we will need to worry about sending a notification back to where the callback is triggered (or maybe have the callback raise an error when the node it's registered on is no longer the owner). However, as far as I'm aware, this will happen way further down the line. |
Let's prevent future developers from messing this up by calling it out explicitly that we should NOT have hanging/failing operations in this callback. For example, just avoid doing any kind of I/O.
Feel like it makes sense to test on core side.
Makes sense. Let's just track this with a ticket and make sure to consider if/when we implement move semantic. |
|
@Kunchd I'll leave the approval/merge to you. |
Kunchd
left a comment
There was a problem hiding this comment.
Thanks for addressing all the previous comments! I left a few more nits, but we should be close to done.
| object_ref_id.in_core_worker = False | ||
| (<object>user_data)(object_ref_id) | ||
| callback = <object>user_callback | ||
| id_binary = c_object_id.Binary() |
There was a problem hiding this comment.
Just curious, is binary preferred over hex?
There was a problem hiding this comment.
Using binary here just because it matches the Ray Data receiving side; hex does provide better readability though. Will use hex for the logger exception message below!
| id_binary = c_object_id.Binary() | ||
| callback(id_binary) | ||
| except BaseException: | ||
| logger.exception("Error in object out-of-scope callback") |
There was a problem hiding this comment.
nit: could we be a little more descriptive (i.e. what might've gone wrong, how should the person investigating the exception approach the issue)? It'll also be nice to actually log the error that was raised as well.
There was a problem hiding this comment.
Refined log message!
| cdef: | ||
| CObjectID c_object_id = object_ref.native() | ||
| CAddress c_owner_address | ||
| op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( |
There was a problem hiding this comment.
This is a super personal preference thing so feel free to ignore:
I like to minimize the logic in cython as much as possible. So ideally, either we check for ownership in python layer or do it in cpp and have the error be propagated through cython.
There was a problem hiding this comment.
Added CheckObjectOwnedByUs function to move the ownership check into cpp; now the cython layer just evoke the method and check_status on it.
| registered = CCoreWorkerProcess.GetCoreWorker() \ | ||
| .AddObjectOutOfScopeOrFreedCallback( | ||
| c_object_id, | ||
| _invoke_object_out_of_scope_callback, |
There was a problem hiding this comment.
nit: Is it possible for us to wrap the user callback with the logic in _invoke_object_out_of_scope_callback here by defining a callback here and incrementing the pyref of the new callback? This way we can directly pass in a single callback to the AddObjectOutOfScopeOrFreedCallback function.
I'm just musing an idea here, might not be feasible.
There was a problem hiding this comment.
I think it would be quite hard for us to collapse this into a single callback. AddObjectOutOfScopeOrFreedCallback takes a void (*)(const ObjectID &, void *), i.e. C function pointer (a bare code address), as the second argument. The callback to be triggered on Python isn't a C function and cannot be passed directly here, thus the third void *callback_context argument. Meanwhile, C++ cannot invoke Python callable, so it needs to jump to _invoke_object_out_of_scope_callback (cdef C-level function) and let Cython handle GIL acquiring and function calling there.
The only way to pass a single stateful callable that capture all above is using std::function. However, as std::function is a C++ entity, using it would require introducing an intermediate glue class for holding the Python callback, which messes up GIL handling and makes Python refcount fragile. Overall, I wasn't able to find a way to pass in a single callback without making the code egregiously complex.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
rayhhome
left a comment
There was a problem hiding this comment.
Addressed comments! Also added a unit test in test_object_out_of_scope_callback.py plus a performance microbenchmark in ray_perf.py specifically for tracking callback latency.
| object_ref_id.in_core_worker = False | ||
| (<object>user_data)(object_ref_id) | ||
| callback = <object>user_callback | ||
| id_binary = c_object_id.Binary() |
There was a problem hiding this comment.
Using binary here just because it matches the Ray Data receiving side; hex does provide better readability though. Will use hex for the logger exception message below!
| cdef: | ||
| CObjectID c_object_id = object_ref.native() | ||
| CAddress c_owner_address | ||
| op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( |
There was a problem hiding this comment.
Added CheckObjectOwnedByUs function to move the ownership check into cpp; now the cython layer just evoke the method and check_status on it.
| registered = CCoreWorkerProcess.GetCoreWorker() \ | ||
| .AddObjectOutOfScopeOrFreedCallback( | ||
| c_object_id, | ||
| _invoke_object_out_of_scope_callback, |
There was a problem hiding this comment.
I think it would be quite hard for us to collapse this into a single callback. AddObjectOutOfScopeOrFreedCallback takes a void (*)(const ObjectID &, void *), i.e. C function pointer (a bare code address), as the second argument. The callback to be triggered on Python isn't a C function and cannot be passed directly here, thus the third void *callback_context argument. Meanwhile, C++ cannot invoke Python callable, so it needs to jump to _invoke_object_out_of_scope_callback (cdef C-level function) and let Cython handle GIL acquiring and function calling there.
The only way to pass a single stateful callable that capture all above is using std::function. However, as std::function is a C++ entity, using it would require introducing an intermediate glue class for holding the Python callback, which messes up GIL handling and makes Python refcount fragile. Overall, I wasn't able to find a way to pass in a single callback without making the code egregiously complex.
| id_binary = c_object_id.Binary() | ||
| callback(id_binary) | ||
| except BaseException: | ||
| logger.exception("Error in object out-of-scope callback") |
There was a problem hiding this comment.
Refined log message!
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…e-wiring Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
| timeout: 600 | ||
| script: python object_store/test_callback_throughput.py | ||
| wait_for_nodes: | ||
| num_nodes: 101 |
There was a problem hiding this comment.
Do we need that many node for this? Seems expensive.
There was a problem hiding this comment.
Cut node count down to 10.
| return np.zeros(OBJECT_SIZE, dtype=np.uint8) | ||
|
|
||
|
|
||
| def test_callback_throughput(num_refs, timeout_s=60): |
There was a problem hiding this comment.
Our old release test benchmarks tends to measure repeated throughput of the same operations as an attempt to find regressions. However, we actually found these tests to be uninformative of actual use cases.
Instead, could you write the test closer to how ray data uses these (e.g. invoke a function that creates the object, pass it to the consumer, then check that its out of scope).
There was a problem hiding this comment.
Makes sense! This benchmark was originally added to measure the callback latency in the edge case of object bursts, but considering the benchmark being mainly used for revealing regressions, it does make more sense for the benchmark to reflect how the system works realistically. Adjusted the benchmark to a more standard producer-consumer pipeline.
| }, | ||
| ], | ||
| } | ||
| json.dump(results, out_file) |
There was a problem hiding this comment.
Finally, could you add the test's output visible on core's preset dashboard. This way we can monitor it.
There was a problem hiding this comment.
Will add it once the PR gets merged!
| results += timeit( | ||
| "object out-of-scope callback fire 1k batch", oos_callback_fire_batch, 1000 | ||
| ) | ||
|
|
There was a problem hiding this comment.
Like the comment for the benchmark test below, we want to move away from testing a feature repeatedly.
There was a problem hiding this comment.
Removed this repetitive microbenchmark completely in the new commit!
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit f462377. Configure here.
Kunchd
left a comment
There was a problem hiding this comment.
Thanks for all the work on this! The final note is that we might want to free the object refs as the consumer task completes instead of all in one go to better reflect data's workload. We can chat more about this offline.
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
… Data execution logic Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…allback-core-wiring

Description
This PR integrates the C++ infrastructure and Cython bridge from the prototype (#63601). Specifically, it lands the dedicated callback thread (
object_freed_callback_service_) and theCoreWorker::AddObjectOutOfScopeOrFreedCallbackAPI along with its Python-facingadd_object_out_of_scope_callbackmethod. The Ray DataBlockRefCounterintegration (the primary consumer of this API) is left for a follow-up PR.Scope.
add_object_out_of_scope_callbackis a private internal API with a single intended caller: Ray Data'sBlockRefCounter. It is not part of the public Ray API and is not intended for general use.Motivation. Ray Data needs to know when an
ObjectRefis no longer held by any task or Python variable so it can release per-operator memory accounting. The right signal is Ray Core's own reference counter — when it drops an object's refcount to zero it has ground-truth knowledge that the bytes are gone. This PR exposes that signal to Python asadd_object_out_of_scope_callback. The callback is dispatched on a dedicated thread rather than inline on the IO thread becauseOnObjectOutOfScopeOrFreedfires while holding the RC mutex; calling into Python there would stall the mutex waiting for the GIL, risking deadlock.C++.
CoreWorkerProcessImplgains a dedicatedobject_freed_callback_service_(instrumented_io_context) andobject_freed_callback_thread_, both passed toCoreWorker.CoreWorkerexposes two new overloads ofAddObjectOutOfScopeOrFreedCallback(astd::functionoverload and a C function-pointer overload for Cython) that each wrap the user callback to post to the dedicated thread rather than run inline.Cython.
add_object_out_of_scope_callback(object_ref, callback)is exposed on the PythonCoreWorker. Acdef void noexcept nogilfunction handles GIL acquisition;Py_INCREFat registration keeps the callback alive across the async gap, andPy_DECREFruns in thefinallyblock when the callback fires (or immediately if registration fails).Related issues
Related to #63601 (initial prototype PR).
Additional information
Call flow.
Tests.
core_worker_test.cc: 6 newTEST_Ftests. Checks the callback fires on ref drop, false return for already-freed objects, thread identity, the C function-pointer overload, multiple callbacks all fire, and fires exactly once.python/ray/tests/test_object_out_of_scope_callback.py: new test suite with 7 live-cluster tests. Checks fire ondel ref, false return for already-freed objects, thread identity, multiple callbacks,ray.internal.free, exception safety, and fires-exactly-once.