Skip to content

[Data] Measure local block-size hit rate vs ray.get(meta) size in on_data_ready#63904

Draft
xinyuangui2 wants to merge 6 commits into
ray-project:masterfrom
xinyuangui2:probe-local-block-size
Draft

[Data] Measure local block-size hit rate vs ray.get(meta) size in on_data_ready#63904
xinyuangui2 wants to merge 6 commits into
ray-project:masterfrom
xinyuangui2:probe-local-block-size

Conversation

@xinyuangui2

Copy link
Copy Markdown
Contributor

Draft / measurement-only — not for merge as-is. Pure instrumentation; does not change scheduling behavior.

Why

The per-pair ray.get(meta_ref) in DataOpTask.on_data_ready is a top cost in process_completed_tasks at scale. #63701 proposes replacing it (where possible) with a local get_local_object_locations(block_ref).object_size lookup. This PR gathers the data to justify that: how often is the size known locally, and does it match the metadata size?

What

In on_data_ready, alongside the existing ray.get(meta_ref) (kept as the source of truth), look up the block's object_size via ray.experimental.get_local_object_locations (local, no RPC) and record, process-wide:

  • hit rate — fraction of pairs where the local lookup returns a known object_size;
  • accuracy — of those hits, how many match meta.size_bytes (from ray.get(meta_ref)).

A cumulative snapshot is logged every 5000 probed pairs:

[local-size-probe] 4550/5000 (91.0%) pairs had a local object_size; of those 4550/4550 (100.0%) matched meta.size_bytes (0 mismatched).

Note: the local lookup is RPC-free but still adds per-pair overhead, so this is for measurement runs (e.g. the worker_scaling release test), not production.

…data_ready

Instrumentation only — does not change behavior. In `DataOpTask.on_data_ready`,
alongside the existing `ray.get(meta_ref)` (source of truth), look up the
block's `object_size` via `ray.experimental.get_local_object_locations`
(local, no RPC) and record:

- hit rate: how often the local lookup returns a known `object_size`, and
- accuracy: whether that local size matches `meta.size_bytes`.

A cumulative snapshot is logged every 5000 probed pairs. This quantifies
whether the per-pair `ray.get(meta_ref)` (a top cost in `process_completed_tasks`
at scale) can be replaced by the local size lookup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces process-wide telemetry in DataOpTask to measure how often a block's size is known locally and whether it matches the metadata size. The reviewer identified critical issues regarding thread safety and exception safety in the newly added _record_local_size_probe method. Specifically, updating class-level counters concurrently without synchronization can cause race conditions, and any failure in the telemetry lookup could crash the main execution path. The reviewer recommended wrapping the logic in a try-except block and using a threading.Lock to synchronize counter updates.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +310 to +354
def _record_local_size_probe(self, meta_size_bytes: int) -> None:
"""Measurement-only: how often is the block's size known locally, and
does it match the metadata size?

Looks up ``self._pending_block_ref``'s ``object_size`` via
``get_local_object_locations`` (local, no RPC) and compares it to
``meta_size_bytes`` (``meta.size_bytes`` from ``ray.get(meta_ref)``,
the source of truth). Accumulates process-wide counters and logs a
cumulative snapshot every ``_SIZE_PROBE_LOG_EVERY`` probed pairs.

Note: this is pure instrumentation — it does NOT change behavior. The
existing ``ray.get(meta_ref)`` path still drives emits and budgets.
"""
cls = DataOpTask
cls._size_probe_total += 1

info = get_local_object_locations([self._pending_block_ref]).get(
self._pending_block_ref
)
local_size = info.get("object_size") if info is not None else None

if local_size is not None:
cls._size_probe_local_hits += 1
if local_size == meta_size_bytes:
cls._size_probe_match += 1
else:
cls._size_probe_mismatch += 1

if cls._size_probe_total % cls._SIZE_PROBE_LOG_EVERY == 0:
total = cls._size_probe_total
hits = cls._size_probe_local_hits
hit_pct = 100.0 * hits / total
match_pct = 100.0 * cls._size_probe_match / hits if hits else 0.0
logger.info(
"[local-size-probe] %d/%d (%.1f%%) pairs had a local object_size; "
"of those %d/%d (%.1f%%) matched meta.size_bytes "
"(%d mismatched).",
hits,
total,
hit_pct,
cls._size_probe_match,
hits,
match_pct,
cls._size_probe_mismatch,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The telemetry probe method _record_local_size_probe has two major issues:

  1. Race Condition / Thread Safety: The class-level counters (_size_probe_total, _size_probe_local_hits, etc.) are shared across all DataOpTask instances. Since multiple dataset execution threads can run concurrently, updating and reading these counters without synchronization can lead to race conditions and inconsistent log messages.
  2. Exception Safety: Telemetry/measurement code should never crash the main execution path. If get_local_object_locations raises an unexpected exception, it will crash the entire dataset execution.

We should wrap the telemetry logic in a try...except block and synchronize the shared counter updates using a lock.

    def _record_local_size_probe(self, meta_size_bytes: int) -> None:
        """Measurement-only: how often is the block's size known locally, and
        does it match the metadata size?

        Looks up ``self._pending_block_ref``'s ``object_size`` via
        ``get_local_object_locations`` (local, no RPC) and compares it to
        ``meta_size_bytes`` (``meta.size_bytes`` from ``ray.get(meta_ref)``,
        the source of truth). Accumulates process-wide counters and logs a
        cumulative snapshot every ``_SIZE_PROBE_LOG_EVERY`` probed pairs.

        Note: this is pure instrumentation — it does NOT change behavior. The
        existing ``ray.get(meta_ref)`` path still drives emits and budgets.
        """
        cls = DataOpTask
        block_ref = self._pending_block_ref
        try:
            info = get_local_object_locations([block_ref]).get(block_ref)
            local_size = info.get("object_size") if info is not None else None

            with cls._size_probe_lock:
                cls._size_probe_total += 1
                if local_size is not None:
                    cls._size_probe_local_hits += 1
                    if local_size == meta_size_bytes:
                        cls._size_probe_match += 1
                    else:
                        cls._size_probe_mismatch += 1

                if cls._size_probe_total % cls._SIZE_PROBE_LOG_EVERY == 0:
                    total = cls._size_probe_total
                    hits = cls._size_probe_local_hits
                    match_count = cls._size_probe_match
                    mismatch_count = cls._size_probe_mismatch
                    hit_pct = 100.0 * hits / total
                    match_pct = 100.0 * match_count / hits if hits else 0.0
                    logger.info(
                        "[local-size-probe] %d/%d (%.1f%%) pairs had a local object_size; "
                        "of those %d/%d (%.1f%%) matched meta.size_bytes "
                        "(%d mismatched).",
                        hits,
                        total,
                        hit_pct,
                        match_count,
                        hits,
                        match_pct,
                        mismatch_count,
                    )
        except Exception:
            logger.warning("Failed to record local size probe telemetry.", exc_info=True)

from ray.data._internal.stats import StatsDict, Timer
from ray.data.block import Block, BlockMetadata, TaskExecWorkerStats
from ray.data.context import DataContext
from ray.experimental.locations import get_local_object_locations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Import threading to support thread-safe telemetry counters.

Suggested change
from ray.experimental.locations import get_local_object_locations
from ray.experimental.locations import get_local_object_locations
import threading

Comment on lines +123 to +128
_size_probe_total = 0
_size_probe_local_hits = 0
_size_probe_match = 0
_size_probe_mismatch = 0
# Emit a cumulative snapshot every this many probed pairs.
_SIZE_PROBE_LOG_EVERY = 5000

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a class-level lock _size_probe_lock to synchronize updates to the shared telemetry counters across multiple concurrent dataset executions.

Suggested change
_size_probe_total = 0
_size_probe_local_hits = 0
_size_probe_match = 0
_size_probe_mismatch = 0
# Emit a cumulative snapshot every this many probed pairs.
_SIZE_PROBE_LOG_EVERY = 5000
_size_probe_total = 0
_size_probe_local_hits = 0
_size_probe_match = 0
_size_probe_mismatch = 0
_size_probe_lock = threading.Lock()
# Emit a cumulative snapshot every this many probed pairs.
_SIZE_PROBE_LOG_EVERY = 5000

Add DataOpTask.local_size_probe_stats() (and reset_local_size_probe()) so
benchmarks/release tests can read the cumulative local-size hit rate and
meta.size_bytes match % and surface them in their result metrics.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
xinyuangui2 added a commit to xinyuangui2/ray that referenced this pull request Jun 6, 2026
…n metrics

If the Ray build includes the on_data_ready local-size probe (PR ray-project#63904),
surface its cumulative hit rate and meta.size_bytes match % in the result
metrics (local_size_probe_hit_pct / _match_pct / counts). Reset at the
start of the run; no-op on builds without the probe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
xinyuangui2 and others added 3 commits June 7, 2026 04:15
object_size (object-store serialized data+metadata, with Arrow framing/
padding) is consistently a bit larger than the logical meta.size_bytes, so
exact equality is ~never true (0%) and uninformative. Replace the
match/mismatch counters with closeness: aggregate object_size/meta ratio,
within-1% rate, and worst-case relative difference. This answers whether
the local size is a good-enough proxy for the budget size_bytes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
So the probe hit rate + object_size/meta.size_bytes closeness land in the
worker_scaling result JSON (not just the debug log) when running this PR
as the release test. Defensive: no-op on builds without the probe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Track a fixed-bucket histogram (0.1% resolution, bounded memory, no deps)
of the per-hit |object_size - meta.size_bytes| / meta.size_bytes and report
local_size_probe_rel_diff_p50_pct / _p90_pct alongside the aggregate ratio
and max. Gives the distribution of the object_size-vs-meta gap, not just
the mean/extremes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
xinyuangui2 added a commit to xinyuangui2/ray that referenced this pull request Jun 7, 2026
…obe stats

Update 1 on the deferred-emit pipeline:

- On a local-size miss, estimate the output budget from the operator's
  running average block size (recorded from exact meta.size_bytes in
  replay) instead of a per-ref ray.get. All pairs now defer
  (meta_bytes=None) so metadata flows through ONE batched ray.get — a
  single metadata-fetch path. Misses are rare (object_size is locally
  known for ~all pairs), so the estimate is a small, transient budget
  approximation; the exact metadata still arrives via the batched fetch.

- Replace the old size-known counter (which keyed off meta_bytes, now
  always None) with the ray-project#63904-style probe recorded in replay_deferred_emits
  where both the local object_size and exact meta.size_bytes are known:
  hit rate, object_size/meta ratio, within-1%, and p50/p90/max relative
  diff. Logged periodically and surfaced in the worker_scaling result
  metrics (local_size_probe_*).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Add per-pair latency timing (count/avg/p50/p90/max µs, via a bounded
log-scale histogram) around both size-lookup paths in on_data_ready: the
current ray.get(meta_ref) fetch and the candidate get_local_object_locations
local lookup. Surfaced in local_size_probe_stats() (so the worker_scaling
benchmark reports it) and the periodic probe log line.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

stale The issue is stale. It will be closed within 7 days unless there are further conversation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant