Skip to content

[Data][1/2] - Dynamic work queue for traversals#64388

Open
goutamvenkat-anyscale wants to merge 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/dynamic_work_queue
Open

[Data][1/2] - Dynamic work queue for traversals#64388
goutamvenkat-anyscale wants to merge 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:goutam/dynamic_work_queue

Conversation

@goutamvenkat-anyscale

@goutamvenkat-anyscale goutamvenkat-anyscale commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

Description

Adds parallel_process_work_stealing, a thread-pool-backed generator for parallelizing workloads where the full set of work items is discovered at runtime (e.g., recursive directory listing). Unlike make_async_gen which maps over a static input iterator, this utility lets workers dynamically enqueue new work items back into a shared queue, enabling work-stealing-style load balancing across threads.

  1. Introduces dynamic_work_queue.py with parallel_process_work_stealing
  2. _WorkerPool (thread lifecycle management)
  3. _raise_if_error (cross-thread error propagation).

Supports optional deterministic ordering via preserve_order + order_key, grouping results by their originating seed item.
Adds comprehensive tests covering flat processing, recursive tree traversal, deep chains, error propagation, early stopping, empty inputs, and ordering guarantees.

Diagram:

seed_items ──────▶ [ work_queue ] ◀──────────┐
                        │                     │
            ┌───────────┼───────────┐         │
            ▼           ▼           ▼         │
        worker_0    worker_1    worker_N       │
            │           │           │         │
            │   process_fn(item, add_work, add_result)
            │         /            \          │
            │   add_work(new)   add_result(r) │
            │       │                │        │
            │       └────────────────┼────────┘
            │                        ▼
            └──────────────▶ [ output_queue ]
                                     │
                          completion signaler
                         (join work_queue, then
                          send SENTINEL)
                                     │
                                     ▼
                            main thread yields
                            results to caller

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner June 27, 2026 00:28

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new dynamic work queue implementation (parallel_process_work_stealing) and corresponding unit tests to support parallel processing with dynamic work generation and load balancing. The review feedback highlights several critical robustness improvements: catching BaseException instead of Exception in worker threads to prevent silent failures, updating the type annotation of _WorkerError.exception to Optional[BaseException] to satisfy type checkers, implementing a global timeout pattern when joining threads during shutdown, and wrapping the completion signaler in a try-except block to avoid indefinite hangs on unexpected errors.

Comment on lines +138 to +139
except Exception as e:
pool.output_queue.put(_WorkerError(e))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In _worker, catching only Exception means that any BaseException (such as KeyboardInterrupt or SystemExit) raised inside process_fn will not be caught. This causes the worker thread to die silently while the task is still marked as done in the finally block, leading to silent data loss and incomplete processing without the main thread knowing. Catching BaseException instead ensures all critical errors are propagated to the main thread.

            except BaseException as e:\n                pool.output_queue.put(_WorkerError(e))

output queue are unambiguously distinguishable from legitimate result
values."""

exception: BaseException

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception field in _WorkerError is typed as BaseException, but in _raise_if_error it is set to None to avoid reference cycles. This will cause type checkers like mypy to raise an error. The type should be updated to Optional[BaseException].

Suggested change
exception: BaseException
exception: Optional[BaseException]

Comment on lines +82 to +84
if join_timeout is not None:
for t in self._threads:
t.join(timeout=join_timeout)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _WorkerPool.shutdown, joining threads sequentially with a fixed timeout can block the main thread for up to num_workers * join_timeout if multiple threads hang. It is highly recommended to use a global timeout pattern to ensure the total shutdown time does not exceed join_timeout.

        if join_timeout is not None:\n            import time\n            end_time = time.monotonic() + join_timeout\n            for t in self._threads:\n                remaining = end_time - time.monotonic()\n                if remaining <= 0:\n                    break\n                t.join(timeout=remaining)

Comment on lines +173 to +179
if not _interruptible_join(pool.work_queue, pool.interrupted):
return
# Stop consumers
pool.output_queue.put(SENTINEL)
for _ in range(num_workers):
# Stop producers
pool.work_queue.put(SENTINEL)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _signal_completion, if any unexpected exception occurs, the signaler thread will die silently, causing the main thread to block indefinitely on pool.output_queue.get(). Wrapping the signaler logic in a try-except block to propagate unexpected exceptions to the output_queue improves robustness and prevents hangs.

    try:\n        if not _interruptible_join(pool.work_queue, pool.interrupted):\n            return\n        # Stop consumers\n        pool.output_queue.put(SENTINEL)\n        for _ in range(num_workers):\n            # Stop producers\n            pool.work_queue.put(SENTINEL)\n    except InterruptedError:\n        pass\n    except Exception as e:\n        pool.output_queue.put(_WorkerError(e))

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit c843a27. Configure here.

else:
for item in iter(pool.output_queue.get, SENTINEL):
_raise_if_error(item)
yield item

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentinel equality stops output early

Medium Severity

The main thread drains output_queue with iter(pool.output_queue.get, SENTINEL), which ends when a dequeued value compares equal to the sentinel via ==, not identity. A legitimate add_result value whose __eq__ returns true for the sentinel can terminate draining early, yielding incomplete results without error while workers may still be running.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c843a27. Configure here.

@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Jun 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

1 participant