Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 140 additions & 105 deletions packages/on_demand_video_decoder/tests/test_stream_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pytest
import sys

import ctypes
import random
import threading
import time
Expand Down Expand Up @@ -48,115 +49,149 @@ def test_stream_access_single():
assert gop_decoded is not None, f"gop_decoded is None for DecodeN12ToRGB, frames: {frames}"


def test_gil_release_parallel_decode_performance():
def _heartbeats_during(call, margin=0.02):
"""
Test that verifies GIL release by measuring overlap of decode operations.

Strategy:
- Run decode operations in parallel threads
- Record precise start/end times of each decode call
- If GIL is released: decode calls can OVERLAP in time
- If GIL is NOT released: decode calls will be SERIALIZED (no overlap)

Key metric: We calculate the OVERLAP RATIO
- overlap_ratio = actual_parallel_time / sum_of_individual_times
- With GIL released: overlap_ratio < 1 (calls overlap)
- Without GIL: overlap_ratio ≈ 1 (calls serialized)
Run `call` while a background thread records timestamps in a tight pure-Python loop.

Returns (interior_beats, duration):
- interior_beats: heartbeats observed strictly inside (start + margin, end - margin),
i.e. while `call` was executing.
- duration: wall-clock duration of `call` in seconds.

This is a deterministic GIL-release probe. If `call` holds the GIL for its entire
duration, the heartbeat thread cannot execute any Python bytecode until `call`
returns, so interior_beats is exactly 0. If `call` releases the GIL, the heartbeat
loop runs throughout and interior_beats is in the thousands. The margin excludes
beats from thread-switch slices at the call boundaries (default switch interval
is 5ms, so 20ms margin is ample).
"""
path_base = utils.get_data_dir()
num_threads = 4
num_decode_calls = 5

# Create separate decoders for each thread
decoders = [nvc.CreateSampleReader(num_of_set=2, num_of_file=1, iGpu=0) for _ in range(num_threads)]

all_files = utils.select_random_clip(path_base)
assert all_files is not None and len(all_files) > 0, "No test files found"
files = [all_files[0]]

# Record individual decode call durations per thread
thread_call_times = [[] for _ in range(num_threads)]

def decode_work(thread_id, decoder):
"""Decode and record precise timing for each call"""
# Warm up
decoder.DecodeN12ToRGB(files, [10])

for i in range(num_decode_calls):
start = time.perf_counter()
frames = decoder.DecodeN12ToRGB(files, [50 + i * 10])
end = time.perf_counter()
thread_call_times[thread_id].append((start, end))
assert frames is not None

# Run all threads in parallel
threads = []
overall_start = time.perf_counter()
for i, decoder in enumerate(decoders):
t = threading.Thread(target=decode_work, args=(i, decoder))
threads.append(t)
t.start()

for t in threads:
t.join()
overall_end = time.perf_counter()

# Calculate metrics
actual_parallel_time = overall_end - overall_start

# Sum of all individual decode call durations
total_individual_time = 0
all_call_times = []
for thread_times in thread_call_times:
for start, end in thread_times:
total_individual_time += end - start
all_call_times.append((start, end))

# Calculate overlap: how many calls were running simultaneously?
# Sort all events and count concurrent calls at each point
events = []
for start, end in all_call_times:
events.append((start, 'start'))
events.append((end, 'end'))
events.sort(key=lambda x: x[0])

max_concurrent = 0
current_concurrent = 0
for _, event_type in events:
if event_type == 'start':
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
else:
current_concurrent -= 1

overlap_ratio = actual_parallel_time / total_individual_time if total_individual_time > 0 else 1

print(f"\n[GIL Parallel Test] Actual parallel time: {actual_parallel_time*1000:.1f}ms")
print(f"[GIL Parallel Test] Sum of individual call times: {total_individual_time*1000:.1f}ms")
print(f"[GIL Parallel Test] Overlap ratio: {overlap_ratio:.2f} (lower = more overlap)")
print(f"[GIL Parallel Test] Max concurrent decode calls: {max_concurrent}")

# If GIL is released, we expect:
# 1. Multiple decode calls running concurrently (max_concurrent > 1)
# 2. Overlap ratio significantly less than 1
#
# If GIL is NOT released:
# 1. max_concurrent = 1 (calls are serialized)
# 2. overlap_ratio ≈ 1 (no overlap, serial execution)

# Threshold: with 4 threads, if GIL is released, we should see at least 2 concurrent calls
MIN_EXPECTED_CONCURRENT = 2
MAX_OVERLAP_RATIO = 0.9 # Allow some overhead, but should be < 1

assert max_concurrent >= MIN_EXPECTED_CONCURRENT, (
f"GIL does not appear to be released! "
f"Max concurrent decode calls: {max_concurrent} (expected >= {MIN_EXPECTED_CONCURRENT}). "
"If GIL was released, multiple decode calls should run simultaneously."
beats = []
stop = threading.Event()
started = threading.Event()

def heartbeat():
append = beats.append
clock = time.perf_counter
started.set()
while not stop.is_set():
append(clock())

t = threading.Thread(target=heartbeat, daemon=True)
t.start()
started.wait()

start = time.perf_counter()
call()
end = time.perf_counter()

stop.set()
t.join()

lo, hi = start + margin, end - margin
interior_beats = sum(1 for ts in beats if lo < ts < hi)
return interior_beats, end - start


# With the GIL released, the heartbeat loop runs at millions of iterations per second;
# even on a heavily loaded machine it gets scheduled often enough to record thousands
# of beats per second. With the GIL held, interior beats are exactly 0. A threshold of
# 50 sits orders of magnitude away from both outcomes.
MIN_INTERIOR_HEARTBEATS = 50


def test_gil_probe_negative_control():
"""
Validate the probe itself: a C call that HOLDS the GIL must be detected as such.

ctypes.PyDLL calls foreign functions WITHOUT releasing the GIL, so a blocking
usleep(0.5s) through PyDLL freezes all other Python threads for its duration.
The probe must report (near) zero interior heartbeats.
"""
libc_hold_gil = ctypes.PyDLL("libc.so.6")
interior_beats, duration = _heartbeats_during(lambda: libc_hold_gil.usleep(500_000))

print(f"\n[GIL Probe Negative Control] duration: {duration*1000:.1f}ms, interior beats: {interior_beats}")
assert duration >= 0.4, "usleep did not block as expected; control is inconclusive"
assert interior_beats < MIN_INTERIOR_HEARTBEATS, (
f"Probe failed to detect a held GIL: {interior_beats} interior heartbeats "
f"observed during a GIL-holding C call (expected < {MIN_INTERIOR_HEARTBEATS})."
)


def test_gil_probe_positive_control():
"""
Validate the probe itself: a C call that RELEASES the GIL must be detected as such.

ctypes.CDLL releases the GIL around foreign calls, so the same usleep(0.5s)
through CDLL lets the heartbeat thread run freely throughout.
"""
libc_release_gil = ctypes.CDLL("libc.so.6")
interior_beats, duration = _heartbeats_during(lambda: libc_release_gil.usleep(500_000))

print(f"\n[GIL Probe Positive Control] duration: {duration*1000:.1f}ms, interior beats: {interior_beats}")
assert duration >= 0.4, "usleep did not block as expected; control is inconclusive"
assert interior_beats >= MIN_INTERIOR_HEARTBEATS, (
f"Probe failed to detect a released GIL: only {interior_beats} interior "
f"heartbeats observed during a GIL-releasing C call "
f"(expected >= {MIN_INTERIOR_HEARTBEATS})."
)

print(
f"[GIL Parallel Test] PASSED - GIL was released "
f"(max {max_concurrent} concurrent calls, overlap ratio {overlap_ratio:.2f})"

def test_gil_release_during_decode():
"""
Verify DecodeN12ToRGB releases the GIL while decoding.

A heartbeat thread increments in a pure-Python loop while a single long decode
call runs. If the extension holds the GIL, the heartbeat records exactly 0 beats
inside the call window; if it releases the GIL, thousands. The two outcomes are
orders of magnitude apart, so the assertion is insensitive to machine load
(validated by the negative/positive control tests above).
"""
path_base = utils.get_data_dir()
base_files = utils.select_random_clip(path_base)
assert base_files is not None and len(base_files) > 0, "No test files found"

# The probe needs the decode call to be long enough that margin trimming leaves
# a meaningful interior window. Grow the per-call workload until it is.
MIN_PROBE_DURATION = 0.08
reps = 4
interior_beats = duration = None
for _ in range(5):
call_files = base_files * reps
# Scattered frame ids across different GOPs so the small cache (num_of_set=2)
# cannot shortcut the decode work.
frame_ids = [30 + (i * 13) % 170 for i in range(len(call_files))]

decoder = nvc.CreateSampleReader(num_of_set=2, num_of_file=len(call_files), iGpu=0)
# Warm up: CUDA context / NVDEC session creation must not pollute the measurement.
decoder.DecodeN12ToRGB(call_files, [10] * len(call_files))

result = {}

def decode_call():
result["frames"] = decoder.DecodeN12ToRGB(call_files, frame_ids)

interior_beats, duration = _heartbeats_during(decode_call)
assert result["frames"] is not None

print(
f"\n[GIL Decode Test] reps: {reps}, decode duration: {duration*1000:.1f}ms, interior beats: {interior_beats}"
)
del decoder
if duration >= MIN_PROBE_DURATION:
break
reps *= 2
else:
pytest.fail(
f"Could not build a decode call longer than {MIN_PROBE_DURATION*1000:.0f}ms "
f"(last attempt: {duration*1000:.1f}ms with reps={reps}); probe inconclusive."
)

assert interior_beats >= MIN_INTERIOR_HEARTBEATS, (
f"GIL does not appear to be released during DecodeN12ToRGB: only "
f"{interior_beats} heartbeats observed inside a {duration*1000:.1f}ms decode "
f"call (expected >= {MIN_INTERIOR_HEARTBEATS}). A heartbeat thread should run "
"freely while the decoder works if the GIL is released."
)


Expand Down