Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def fetch_worker(queue: asyncio.Queue, engine):
# get a event from the queue
event, callback = await queue.get()
# take care of it
fetcher = res = data = None
try:
# get fetcher for the event
fetcher = register.get_fetcher_for_event(event)
Expand All @@ -44,3 +45,10 @@ async def fetch_worker(queue: asyncio.Queue, engine):
finally:
# Notify the queue that the "work item" has been processed.
queue.task_done()
# Drop references to the processed event and its (potentially large)
# fetched payload. Otherwise these locals stay bound in the worker's
# frame while it blocks on the next `queue.get()`, so every idle
# worker pins its last dataset + response in memory indefinitely
# (one retained copy per worker -> a memory high-water mark that only
# resets on process restart). See issues #770 / #844.
event = callback = fetcher = res = data = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Regression test for fetch-worker frame retention (issues #770 / #844).

A fetch worker loops `while True: event, callback = await queue.get(); ...`. Python
keeps a frame's locals bound until they are reassigned or the frame exits, so an
idle worker blocked on the next `queue.get()` used to keep the PREVIOUS fetch's
locals (`event`, `fetcher`, `res`, `data`) alive — pinning the last fetched
dataset (and its HTTP response) in memory, one retained copy per worker, until
process restart. This test fails if that cleanup is removed.
"""

import asyncio
import gc
import weakref

import pytest
from opal_common.fetcher import FetchingEngine
from opal_common.fetcher.fetch_provider import BaseFetchProvider


class _Payload:
"""A weakref-able stand-in for a large fetched dataset."""


class _SentinelProvider(BaseFetchProvider):
async def _fetch_(self):
return _Payload()

async def _process_(self, data):
return data


@pytest.mark.asyncio
async def test_idle_worker_does_not_retain_last_payload():
async with FetchingEngine(worker_count=1) as engine:
engine.register.register_fetcher("SentinelProvider", _SentinelProvider)

result = await engine.handle_url("sentinel://x", fetcher="SentinelProvider")
assert isinstance(result, _Payload)

ref = weakref.ref(result)
del result # drop the caller's reference

# The single worker is now idle, blocked on the next queue.get(). Its frame
# must not still bind the payload from the fetch it just completed.
collected = False
for _ in range(20):
await asyncio.sleep(0.02)
gc.collect()
if ref() is None:
collected = True
break

assert collected, (
"fetch worker retained the last fetched payload in its frame while idle "
"-> per-worker memory high-water mark (regression of #770/#844 fix)"
)