diff --git a/packages/opal-common/opal_common/fetcher/engine/fetch_worker.py b/packages/opal-common/opal_common/fetcher/engine/fetch_worker.py index 6db97b338..b2740a63f 100644 --- a/packages/opal-common/opal_common/fetcher/engine/fetch_worker.py +++ b/packages/opal-common/opal_common/fetcher/engine/fetch_worker.py @@ -25,6 +25,7 @@ async def fetch_worker(queue: asyncio.Queue, engine): # get a event from the queue event, callback = await queue.get() # take care of it + fetcher = res = data = None try: # get fetcher for the event fetcher = register.get_fetcher_for_event(event) @@ -44,3 +45,10 @@ async def fetch_worker(queue: asyncio.Queue, engine): finally: # Notify the queue that the "work item" has been processed. queue.task_done() + # Drop references to the processed event and its (potentially large) + # fetched payload. Otherwise these locals stay bound in the worker's + # frame while it blocks on the next `queue.get()`, so every idle + # worker pins its last dataset + response in memory indefinitely + # (one retained copy per worker -> a memory high-water mark that only + # resets on process restart). See issues #770 / #844. + event = callback = fetcher = res = data = None diff --git a/packages/opal-common/opal_common/fetcher/tests/fetch_worker_retention_test.py b/packages/opal-common/opal_common/fetcher/tests/fetch_worker_retention_test.py new file mode 100644 index 000000000..e2212e840 --- /dev/null +++ b/packages/opal-common/opal_common/fetcher/tests/fetch_worker_retention_test.py @@ -0,0 +1,56 @@ +"""Regression test for fetch-worker frame retention (issues #770 / #844). + +A fetch worker loops `while True: event, callback = await queue.get(); ...`. Python +keeps a frame's locals bound until they are reassigned or the frame exits, so an +idle worker blocked on the next `queue.get()` used to keep the PREVIOUS fetch's +locals (`event`, `fetcher`, `res`, `data`) alive — pinning the last fetched +dataset (and its HTTP response) in memory, one retained copy per worker, until +process restart. This test fails if that cleanup is removed. +""" + +import asyncio +import gc +import weakref + +import pytest +from opal_common.fetcher import FetchingEngine +from opal_common.fetcher.fetch_provider import BaseFetchProvider + + +class _Payload: + """A weakref-able stand-in for a large fetched dataset.""" + + +class _SentinelProvider(BaseFetchProvider): + async def _fetch_(self): + return _Payload() + + async def _process_(self, data): + return data + + +@pytest.mark.asyncio +async def test_idle_worker_does_not_retain_last_payload(): + async with FetchingEngine(worker_count=1) as engine: + engine.register.register_fetcher("SentinelProvider", _SentinelProvider) + + result = await engine.handle_url("sentinel://x", fetcher="SentinelProvider") + assert isinstance(result, _Payload) + + ref = weakref.ref(result) + del result # drop the caller's reference + + # The single worker is now idle, blocked on the next queue.get(). Its frame + # must not still bind the payload from the fetch it just completed. + collected = False + for _ in range(20): + await asyncio.sleep(0.02) + gc.collect() + if ref() is None: + collected = True + break + + assert collected, ( + "fetch worker retained the last fetched payload in its frame while idle " + "-> per-worker memory high-water mark (regression of #770/#844 fix)" + )