From 22cde066474d8b2e51c2899ce1070205023306ee Mon Sep 17 00:00:00 2001 From: pinjie Date: Fri, 12 Jun 2026 01:10:43 -0700 Subject: [PATCH] Replace flaky GIL/performance test with a deterministic heartbeat-based GIL probe Signed-off-by: pinjie --- .../tests/test_stream_decoder.py | 245 ++++++++++-------- 1 file changed, 140 insertions(+), 105 deletions(-) diff --git a/packages/on_demand_video_decoder/tests/test_stream_decoder.py b/packages/on_demand_video_decoder/tests/test_stream_decoder.py index ee2c413..0f22234 100644 --- a/packages/on_demand_video_decoder/tests/test_stream_decoder.py +++ b/packages/on_demand_video_decoder/tests/test_stream_decoder.py @@ -15,6 +15,7 @@ import pytest import sys +import ctypes import random import threading import time @@ -48,115 +49,149 @@ def test_stream_access_single(): assert gop_decoded is not None, f"gop_decoded is None for DecodeN12ToRGB, frames: {frames}" -def test_gil_release_parallel_decode_performance(): +def _heartbeats_during(call, margin=0.02): """ - Test that verifies GIL release by measuring overlap of decode operations. - - Strategy: - - Run decode operations in parallel threads - - Record precise start/end times of each decode call - - If GIL is released: decode calls can OVERLAP in time - - If GIL is NOT released: decode calls will be SERIALIZED (no overlap) - - Key metric: We calculate the OVERLAP RATIO - - overlap_ratio = actual_parallel_time / sum_of_individual_times - - With GIL released: overlap_ratio < 1 (calls overlap) - - Without GIL: overlap_ratio ≈ 1 (calls serialized) + Run `call` while a background thread records timestamps in a tight pure-Python loop. + + Returns (interior_beats, duration): + - interior_beats: heartbeats observed strictly inside (start + margin, end - margin), + i.e. while `call` was executing. + - duration: wall-clock duration of `call` in seconds. + + This is a deterministic GIL-release probe. If `call` holds the GIL for its entire + duration, the heartbeat thread cannot execute any Python bytecode until `call` + returns, so interior_beats is exactly 0. If `call` releases the GIL, the heartbeat + loop runs throughout and interior_beats is in the thousands. The margin excludes + beats from thread-switch slices at the call boundaries (default switch interval + is 5ms, so 20ms margin is ample). """ - path_base = utils.get_data_dir() - num_threads = 4 - num_decode_calls = 5 - - # Create separate decoders for each thread - decoders = [nvc.CreateSampleReader(num_of_set=2, num_of_file=1, iGpu=0) for _ in range(num_threads)] - - all_files = utils.select_random_clip(path_base) - assert all_files is not None and len(all_files) > 0, "No test files found" - files = [all_files[0]] - - # Record individual decode call durations per thread - thread_call_times = [[] for _ in range(num_threads)] - - def decode_work(thread_id, decoder): - """Decode and record precise timing for each call""" - # Warm up - decoder.DecodeN12ToRGB(files, [10]) - - for i in range(num_decode_calls): - start = time.perf_counter() - frames = decoder.DecodeN12ToRGB(files, [50 + i * 10]) - end = time.perf_counter() - thread_call_times[thread_id].append((start, end)) - assert frames is not None - - # Run all threads in parallel - threads = [] - overall_start = time.perf_counter() - for i, decoder in enumerate(decoders): - t = threading.Thread(target=decode_work, args=(i, decoder)) - threads.append(t) - t.start() - - for t in threads: - t.join() - overall_end = time.perf_counter() - - # Calculate metrics - actual_parallel_time = overall_end - overall_start - - # Sum of all individual decode call durations - total_individual_time = 0 - all_call_times = [] - for thread_times in thread_call_times: - for start, end in thread_times: - total_individual_time += end - start - all_call_times.append((start, end)) - - # Calculate overlap: how many calls were running simultaneously? - # Sort all events and count concurrent calls at each point - events = [] - for start, end in all_call_times: - events.append((start, 'start')) - events.append((end, 'end')) - events.sort(key=lambda x: x[0]) - - max_concurrent = 0 - current_concurrent = 0 - for _, event_type in events: - if event_type == 'start': - current_concurrent += 1 - max_concurrent = max(max_concurrent, current_concurrent) - else: - current_concurrent -= 1 - - overlap_ratio = actual_parallel_time / total_individual_time if total_individual_time > 0 else 1 - - print(f"\n[GIL Parallel Test] Actual parallel time: {actual_parallel_time*1000:.1f}ms") - print(f"[GIL Parallel Test] Sum of individual call times: {total_individual_time*1000:.1f}ms") - print(f"[GIL Parallel Test] Overlap ratio: {overlap_ratio:.2f} (lower = more overlap)") - print(f"[GIL Parallel Test] Max concurrent decode calls: {max_concurrent}") - - # If GIL is released, we expect: - # 1. Multiple decode calls running concurrently (max_concurrent > 1) - # 2. Overlap ratio significantly less than 1 - # - # If GIL is NOT released: - # 1. max_concurrent = 1 (calls are serialized) - # 2. overlap_ratio ≈ 1 (no overlap, serial execution) - - # Threshold: with 4 threads, if GIL is released, we should see at least 2 concurrent calls - MIN_EXPECTED_CONCURRENT = 2 - MAX_OVERLAP_RATIO = 0.9 # Allow some overhead, but should be < 1 - - assert max_concurrent >= MIN_EXPECTED_CONCURRENT, ( - f"GIL does not appear to be released! " - f"Max concurrent decode calls: {max_concurrent} (expected >= {MIN_EXPECTED_CONCURRENT}). " - "If GIL was released, multiple decode calls should run simultaneously." + beats = [] + stop = threading.Event() + started = threading.Event() + + def heartbeat(): + append = beats.append + clock = time.perf_counter + started.set() + while not stop.is_set(): + append(clock()) + + t = threading.Thread(target=heartbeat, daemon=True) + t.start() + started.wait() + + start = time.perf_counter() + call() + end = time.perf_counter() + + stop.set() + t.join() + + lo, hi = start + margin, end - margin + interior_beats = sum(1 for ts in beats if lo < ts < hi) + return interior_beats, end - start + + +# With the GIL released, the heartbeat loop runs at millions of iterations per second; +# even on a heavily loaded machine it gets scheduled often enough to record thousands +# of beats per second. With the GIL held, interior beats are exactly 0. A threshold of +# 50 sits orders of magnitude away from both outcomes. +MIN_INTERIOR_HEARTBEATS = 50 + + +def test_gil_probe_negative_control(): + """ + Validate the probe itself: a C call that HOLDS the GIL must be detected as such. + + ctypes.PyDLL calls foreign functions WITHOUT releasing the GIL, so a blocking + usleep(0.5s) through PyDLL freezes all other Python threads for its duration. + The probe must report (near) zero interior heartbeats. + """ + libc_hold_gil = ctypes.PyDLL("libc.so.6") + interior_beats, duration = _heartbeats_during(lambda: libc_hold_gil.usleep(500_000)) + + print(f"\n[GIL Probe Negative Control] duration: {duration*1000:.1f}ms, interior beats: {interior_beats}") + assert duration >= 0.4, "usleep did not block as expected; control is inconclusive" + assert interior_beats < MIN_INTERIOR_HEARTBEATS, ( + f"Probe failed to detect a held GIL: {interior_beats} interior heartbeats " + f"observed during a GIL-holding C call (expected < {MIN_INTERIOR_HEARTBEATS})." + ) + + +def test_gil_probe_positive_control(): + """ + Validate the probe itself: a C call that RELEASES the GIL must be detected as such. + + ctypes.CDLL releases the GIL around foreign calls, so the same usleep(0.5s) + through CDLL lets the heartbeat thread run freely throughout. + """ + libc_release_gil = ctypes.CDLL("libc.so.6") + interior_beats, duration = _heartbeats_during(lambda: libc_release_gil.usleep(500_000)) + + print(f"\n[GIL Probe Positive Control] duration: {duration*1000:.1f}ms, interior beats: {interior_beats}") + assert duration >= 0.4, "usleep did not block as expected; control is inconclusive" + assert interior_beats >= MIN_INTERIOR_HEARTBEATS, ( + f"Probe failed to detect a released GIL: only {interior_beats} interior " + f"heartbeats observed during a GIL-releasing C call " + f"(expected >= {MIN_INTERIOR_HEARTBEATS})." ) - print( - f"[GIL Parallel Test] PASSED - GIL was released " - f"(max {max_concurrent} concurrent calls, overlap ratio {overlap_ratio:.2f})" + +def test_gil_release_during_decode(): + """ + Verify DecodeN12ToRGB releases the GIL while decoding. + + A heartbeat thread increments in a pure-Python loop while a single long decode + call runs. If the extension holds the GIL, the heartbeat records exactly 0 beats + inside the call window; if it releases the GIL, thousands. The two outcomes are + orders of magnitude apart, so the assertion is insensitive to machine load + (validated by the negative/positive control tests above). + """ + path_base = utils.get_data_dir() + base_files = utils.select_random_clip(path_base) + assert base_files is not None and len(base_files) > 0, "No test files found" + + # The probe needs the decode call to be long enough that margin trimming leaves + # a meaningful interior window. Grow the per-call workload until it is. + MIN_PROBE_DURATION = 0.08 + reps = 4 + interior_beats = duration = None + for _ in range(5): + call_files = base_files * reps + # Scattered frame ids across different GOPs so the small cache (num_of_set=2) + # cannot shortcut the decode work. + frame_ids = [30 + (i * 13) % 170 for i in range(len(call_files))] + + decoder = nvc.CreateSampleReader(num_of_set=2, num_of_file=len(call_files), iGpu=0) + # Warm up: CUDA context / NVDEC session creation must not pollute the measurement. + decoder.DecodeN12ToRGB(call_files, [10] * len(call_files)) + + result = {} + + def decode_call(): + result["frames"] = decoder.DecodeN12ToRGB(call_files, frame_ids) + + interior_beats, duration = _heartbeats_during(decode_call) + assert result["frames"] is not None + + print( + f"\n[GIL Decode Test] reps: {reps}, decode duration: {duration*1000:.1f}ms, interior beats: {interior_beats}" + ) + del decoder + if duration >= MIN_PROBE_DURATION: + break + reps *= 2 + else: + pytest.fail( + f"Could not build a decode call longer than {MIN_PROBE_DURATION*1000:.0f}ms " + f"(last attempt: {duration*1000:.1f}ms with reps={reps}); probe inconclusive." + ) + + assert interior_beats >= MIN_INTERIOR_HEARTBEATS, ( + f"GIL does not appear to be released during DecodeN12ToRGB: only " + f"{interior_beats} heartbeats observed inside a {duration*1000:.1f}ms decode " + f"call (expected >= {MIN_INTERIOR_HEARTBEATS}). A heartbeat thread should run " + "freely while the decoder works if the GIL is released." )