From ee51886e78b39eff017fbce495e4de4a8913a14e Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 29 May 2026 18:00:46 +0200 Subject: [PATCH 1/4] test: generalize integration polling helpers Make call_with_exp_backoff and poll_until_condition generic and consistent so the same two helpers can be shared across apify-sdk-python, crawlee-python and apify-client-python: - call_with_exp_backoff no longer takes rq_access_mode. It now takes an optional condition (default: truthy) and retries fn until the condition holds, returning the last result. The Apify RQ single/shared distinction moves to the call sites via max_retries (0 = call once / single mode, 5 = retry / shared mode). - poll_until_condition gains the same condition default and now accepts both sync and async callables (so it can poll plain values too). - Both helpers accept sync or async fn via a small _maybe_await helper, typed with overloads so the return type is preserved. All ~36 call sites in test_request_queue.py are refactored accordingly. --- tests/integration/_utils.py | 113 ++++++++++++++++-------- tests/integration/test_request_queue.py | 97 ++++++++++++-------- 2 files changed, 137 insertions(+), 73 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 3fb26bbd..dada6684 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,57 +1,98 @@ from __future__ import annotations import asyncio +import inspect +import logging import time -from typing import TYPE_CHECKING, Literal, TypeVar +from typing import TYPE_CHECKING, TypeVar, cast, overload from crawlee._utils.crypto import crypto_random_object_id -from apify import Actor - if TYPE_CHECKING: from collections.abc import Awaitable, Callable -T = TypeVar('T') +logger = logging.getLogger(__name__) +T = TypeVar('T') -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - *, - rq_access_mode: Literal['single', 'shared'], - max_retries: int = 5, -) -> T | None: - """Call an async callable with exponential backoff retries until it returns a truthy value. - In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests - become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with - exponential backoff to handle that delay in integration tests. +async def _maybe_await(value: Awaitable[T] | T) -> T: + """Await `value` if it is awaitable, otherwise return it unchanged. - When `rq_access_mode` is `'single'`, the function is called once without retries. + Lets `call_with_exp_backoff` and `poll_until_condition` accept both sync and async callables. """ - if rq_access_mode == 'single': - return await fn() - - if rq_access_mode == 'shared': - result = None + if inspect.isawaitable(value): + return await cast('Awaitable[T]', value) + return cast('T', value) - for attempt in range(max_retries): - result = await fn() - if result: - return result +@overload +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +@overload +async def call_with_exp_backoff( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + max_retries: int = 5, + base_delay: float = 1.0, +) -> T: + """Call `fn`, retrying with exponential backoff until `condition(result)` is True. - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) + Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to + `max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is + returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion. - return result + This is useful for eventually-consistent APIs where a freshly added, reclaimed, or handled item may take a + moment to become visible (see https://github.com/apify/apify-sdk-python/issues/808). The default condition + checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once without any retries. - raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') + Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant. + """ + result = await _maybe_await(fn()) + for attempt in range(max_retries): + if condition(result): + return result + delay = base_delay * 2**attempt + logger.info( + 'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries + ) + await asyncio.sleep(delay) + result = await _maybe_await(fn()) + return result +@overload async def poll_until_condition( fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +@overload +async def poll_until_condition( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +async def poll_until_condition( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, *, timeout: float = 60, poll_interval: float = 5, @@ -59,19 +100,21 @@ async def poll_until_condition( """Poll `fn` until `condition(result)` is True or the timeout expires. Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. - Returns the last polled result regardless of whether the condition was met. + Returns the last polled result regardless of whether the condition was met, so the caller can run its own + assertion. The default condition checks for a truthy result. - Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue - stats) that may take a variable amount of time to propagate. + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. request queue + stats) that may take a variable amount of time to propagate. Unlike `call_with_exp_backoff`, the interval + between polls stays constant. """ deadline = time.monotonic() + timeout - result = await fn() + result = await _maybe_await(fn()) while not condition(result): remaining = deadline - time.monotonic() if remaining <= 0: break await asyncio.sleep(min(poll_interval, remaining)) - result = await fn() + result = await _maybe_await(fn()) return result diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 87ae7e0b..d1e77cdd 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -25,8 +25,9 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -# In shared mode, there is a propagation delay between operations so we use test helper -# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. +# In shared mode, there is a propagation delay between operations, so we retry reads with the test helper +# `call_with_exp_backoff` (`max_retries=5`). In single mode reads are immediately consistent, so we call once +# (`max_retries=0`). See https://github.com/apify/apify-sdk-python/issues/808. async def test_add_and_fetch_requests( @@ -35,6 +36,7 @@ async def test_add_and_fetch_requests( ) -> None: """Test basic functionality of adding and fetching requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -46,7 +48,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -60,7 +62,7 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -70,6 +72,7 @@ async def test_add_requests_in_batches( ) -> None: """Test adding multiple requests in a single batch operation.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 rq = request_queue_apify @@ -81,7 +84,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -95,7 +98,7 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -105,6 +108,7 @@ async def test_add_non_unique_requests_in_batch( ) -> None: """Test adding requests with duplicate unique keys in batch.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 rq = request_queue_apify @@ -120,7 +124,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -135,7 +139,7 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' @@ -146,6 +150,7 @@ async def test_forefront_requests_ordering( ) -> None: """Test that forefront requests are processed before regular requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -164,7 +169,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -189,6 +194,7 @@ async def test_request_unique_key_behavior( ) -> None: """Test behavior of custom unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -218,7 +224,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -240,6 +246,7 @@ async def test_request_reclaim_functionality( ) -> None: """Test request reclaiming for failed processing.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -249,7 +256,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -271,7 +278,7 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True @@ -282,6 +289,7 @@ async def test_request_reclaim_with_forefront( """Test reclaiming requests to the front of the queue.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -293,7 +301,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + first_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -327,6 +335,7 @@ async def test_complex_request_objects( ) -> None: """Test handling complex Request objects with various properties.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -343,7 +352,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -374,6 +383,7 @@ async def test_get_request_by_unique_key( ) -> None: """Test retrieving specific requests by their unique_key.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -386,7 +396,7 @@ async def test_get_request_by_unique_key( retrieved_request = await call_with_exp_backoff( lambda: rq.get_request(request_unique_key), - rq_access_mode=rq_access_mode, + max_retries=max_retries, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}' @@ -405,6 +415,7 @@ async def test_metadata_tracking( ) -> None: """Test request queue metadata and counts.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -429,7 +440,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: await rq.mark_request_as_handled(next_request) @@ -449,6 +460,7 @@ async def test_batch_operations_performance( ) -> None: """Test batch operations vs individual operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -470,7 +482,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -479,7 +491,7 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -490,6 +502,7 @@ async def test_state_consistency( ) -> None: """Test queue state consistency during concurrent operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -507,7 +520,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -534,31 +547,32 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: """Test behavior with empty queues.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_empty = await call_with_exp_backoff(rq.is_empty, max_retries=max_retries) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -581,6 +595,7 @@ async def test_large_batch_operations( ) -> None: """Test handling large batches of requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -600,14 +615,14 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -617,6 +632,7 @@ async def test_mixed_string_and_request_objects( ) -> None: """Test adding both string URLs and Request objects.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -645,7 +661,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -669,6 +685,7 @@ async def test_persistence_across_operations( ) -> None: """Test that queue state persists across different operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 # Open queue and add some requests rq = request_queue_apify @@ -685,7 +702,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -710,12 +727,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -785,6 +802,7 @@ async def test_request_ordering_with_mixed_operations( ) -> None: """Test request ordering with mixed add/reclaim operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -795,7 +813,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request1 = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -809,7 +827,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -1044,6 +1062,7 @@ async def test_rq_long_url( ) -> None: """Test handling of requests with long URLs and extended unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1057,12 +1076,12 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished @@ -1075,6 +1094,7 @@ async def test_pre_existing_request_with_user_data( list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 custom_data = {'key': 'value'} rq = request_queue_apify @@ -1088,7 +1108,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1118,18 +1138,19 @@ async def test_request_queue_is_finished( request: pytest.FixtureRequest, ) -> None: rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) + fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, max_retries=max_retries) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) + assert await call_with_exp_backoff(request_queue_apify.is_finished, max_retries=max_retries) async def test_request_queue_deduplication_unprocessed_requests( @@ -1415,7 +1436,7 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + while request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=5): remaining_count += 1 await rq.mark_request_as_handled(request) From fffc7eb21b34f96be68fe5462b5b673b4a97b4e7 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 13:01:38 +0200 Subject: [PATCH 2/4] test: consolidate polling into a single shared helper and deflake tests Replace the two integration polling helpers with a single poll_until_condition in a shared tests/_utils.py, matching the helper in apify-client-python (with backoff_factor covering the exponential-backoff use case). Use it across integration, e2e, and unit tests instead of hand-rolled retry loops and fixed sleeps, and fix the flaky webhook e2e test by keeping the client run alive briefly after add_webhook so the run-succeeded event cannot outrun the webhook registration. --- tests/__init__.py | 0 tests/{integration => }/_utils.py | 73 ++------ tests/e2e/_utils.py | 17 -- tests/e2e/conftest.py | 2 +- tests/e2e/test_actor_api_helpers.py | 23 ++- tests/e2e/test_actor_charge.py | 141 +++++++-------- tests/integration/test_dataset.py | 2 +- tests/integration/test_key_value_store.py | 2 +- tests/integration/test_request_queue.py | 165 +++++++++++------- .../unit/actor/test_actor_key_value_store.py | 21 ++- tests/unit/actor/test_actor_lifecycle.py | 4 +- tests/unit/events/test_apify_event_manager.py | 21 ++- 12 files changed, 236 insertions(+), 235 deletions(-) create mode 100644 tests/__init__.py rename tests/{integration => }/_utils.py (50%) delete mode 100644 tests/e2e/_utils.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/_utils.py b/tests/_utils.py similarity index 50% rename from tests/integration/_utils.py rename to tests/_utils.py index dada6684..b7a6f462 100644 --- a/tests/integration/_utils.py +++ b/tests/_utils.py @@ -2,7 +2,6 @@ import asyncio import inspect -import logging import time from typing import TYPE_CHECKING, TypeVar, cast, overload @@ -11,69 +10,19 @@ if TYPE_CHECKING: from collections.abc import Awaitable, Callable -logger = logging.getLogger(__name__) - T = TypeVar('T') async def _maybe_await(value: Awaitable[T] | T) -> T: """Await `value` if it is awaitable, otherwise return it unchanged. - Lets `call_with_exp_backoff` and `poll_until_condition` accept both sync and async callables. + Lets `poll_until_condition` accept both sync and async callables. """ if inspect.isawaitable(value): return await cast('Awaitable[T]', value) return cast('T', value) -@overload -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool] = ..., - *, - max_retries: int = ..., - base_delay: float = ..., -) -> T: ... -@overload -async def call_with_exp_backoff( - fn: Callable[[], T], - condition: Callable[[T], bool] = ..., - *, - max_retries: int = ..., - base_delay: float = ..., -) -> T: ... -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T] | T], - condition: Callable[[T], bool] = bool, - *, - max_retries: int = 5, - base_delay: float = 1.0, -) -> T: - """Call `fn`, retrying with exponential backoff until `condition(result)` is True. - - Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to - `max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is - returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion. - - This is useful for eventually-consistent APIs where a freshly added, reclaimed, or handled item may take a - moment to become visible (see https://github.com/apify/apify-sdk-python/issues/808). The default condition - checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once without any retries. - - Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant. - """ - result = await _maybe_await(fn()) - for attempt in range(max_retries): - if condition(result): - return result - delay = base_delay * 2**attempt - logger.info( - 'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries - ) - await asyncio.sleep(delay) - result = await _maybe_await(fn()) - return result - - @overload async def poll_until_condition( fn: Callable[[], Awaitable[T]], @@ -81,6 +30,7 @@ async def poll_until_condition( *, timeout: float = ..., poll_interval: float = ..., + backoff_factor: float = ..., ) -> T: ... @overload async def poll_until_condition( @@ -89,31 +39,36 @@ async def poll_until_condition( *, timeout: float = ..., poll_interval: float = ..., + backoff_factor: float = ..., ) -> T: ... async def poll_until_condition( fn: Callable[[], Awaitable[T] | T], condition: Callable[[T], bool] = bool, *, - timeout: float = 60, - poll_interval: float = 5, + timeout: float = 5, + poll_interval: float = 1, + backoff_factor: float = 1, ) -> T: """Poll `fn` until `condition(result)` is True or the timeout expires. Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. Returns the last polled result regardless of whether the condition was met, so the caller can run its own - assertion. The default condition checks for a truthy result. + assertion. The default condition checks for a truthy result. Pass `timeout=0` to call `fn` exactly once. - Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. request queue - stats) that may take a variable amount of time to propagate. Unlike `call_with_exp_backoff`, the interval - between polls stays constant. + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly + created resource appearing in a listing) that may take a variable amount of time to propagate. For highly + variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the + interval after each poll, covering a long timeout with few calls. """ deadline = time.monotonic() + timeout + delay = poll_interval result = await _maybe_await(fn()) while not condition(result): remaining = deadline - time.monotonic() if remaining <= 0: break - await asyncio.sleep(min(poll_interval, remaining)) + await asyncio.sleep(min(delay, remaining)) + delay *= backoff_factor result = await _maybe_await(fn()) return result diff --git a/tests/e2e/_utils.py b/tests/e2e/_utils.py deleted file mode 100644 index b5323272..00000000 --- a/tests/e2e/_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import annotations - -from crawlee._utils.crypto import crypto_random_object_id - - -def generate_unique_resource_name(label: str) -> str: - """Generates a unique resource name, which will contain the given label.""" - name_template = 'python-sdk-tests-{}-generated-{}' - template_length = len(name_template.format('', '')) - api_name_limit = 63 - generated_random_id_length = 8 - label_length_limit = api_name_limit - template_length - generated_random_id_length - - label = label.replace('_', '-') - assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' - - return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index bdcc9883..df44a233 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -17,7 +17,7 @@ from crawlee import service_locator import apify._actor -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify._models import ActorRun from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/e2e/test_actor_api_helpers.py b/tests/e2e/test_actor_api_helpers.py index 3747dd3b..5a43cd84 100644 --- a/tests/e2e/test_actor_api_helpers.py +++ b/tests/e2e/test_actor_api_helpers.py @@ -7,7 +7,7 @@ from apify_shared.consts import ActorPermissionLevel from crawlee._utils.crypto import crypto_random_object_id -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor from apify._models import ActorRun @@ -425,6 +425,8 @@ def do_POST(self) -> None: await Actor.set_value('WEBHOOK_BODY', webhook_body) async def main_client() -> None: + import asyncio + from apify import Webhook, WebhookEventType async with Actor: @@ -438,6 +440,12 @@ async def main_client() -> None: ) ) + # Keep the run alive for a moment after registering the webhook. Without this, the run finishes + # just milliseconds later and the platform may process the run-succeeded event before the freshly + # added ad-hoc webhook has propagated, in which case the webhook never fires and the server Actor + # waits until it times out. + await asyncio.sleep(5) + server_actor, client_actor = await asyncio.gather( make_actor(label='add-webhook-server', main_func=main_server), make_actor(label='add-webhook-client', main_func=main_client), @@ -446,10 +454,15 @@ async def main_client() -> None: server_actor_run = await server_actor.start() server_actor_container_url = server_actor_run['containerUrl'] - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - while not server_actor_initialized: - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - await asyncio.sleep(1) + # Wait for the server Actor's container to start up and bind its HTTP server. The startup time is highly + # variable (image pull, container creation), so poll with a growing interval instead of a fixed sleep. + server_actor_initialized = await poll_until_condition( + lambda: server_actor.last_run().key_value_store().get_record('INITIALIZED'), + timeout=300, + poll_interval=1, + backoff_factor=1.5, + ) + assert server_actor_initialized is not None, 'The server Actor did not initialize in time.' ac_run_result = await run_actor( client_actor, diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index f8b1f393..1b678409 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio from decimal import Decimal from typing import TYPE_CHECKING @@ -8,12 +7,11 @@ from apify_shared.consts import ActorJobStatus +from .._utils import poll_until_condition from apify import Actor from apify._models import ActorRun if TYPE_CHECKING: - from collections.abc import Iterable - from apify_client import ApifyClientAsync from apify_client.clients import ActorClientAsync @@ -112,33 +110,27 @@ async def ppe_actor( return apify_client_async.actor(ppe_actor_build) -def retry_counter(total_attempts: int) -> Iterable[tuple[bool, int]]: - for retry in range(total_attempts - 1): - yield False, retry - - yield True, total_attempts - 1 - - async def test_actor_charge_basic( ppe_actor: ActorClientAsync, run_actor: RunActorFunction, apify_client_async: ApifyClientAsync, ) -> None: run = await run_actor(ppe_actor) + run_id = run.id - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + async def get_run() -> ActorRun: + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 4} - break - except AssertionError: - if is_last_attempt: - raise + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + get_run, + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 4}, + timeout=30, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 4} async def test_actor_charge_limit( @@ -147,20 +139,21 @@ async def test_actor_charge_limit( apify_client_async: ApifyClientAsync, ) -> None: run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) + run_id = run.id + + async def get_run() -> ActorRun: + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + get_run, + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 2}, + timeout=30, + poll_interval=1, + ) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 2} - break - except AssertionError: - if is_last_attempt: - raise + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 2} async def test_actor_push_data_charges_both_events( @@ -171,24 +164,28 @@ async def test_actor_push_data_charges_both_events( """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" run = await run_actor(ppe_push_data_actor) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 5, - 'apify-default-dataset-item': 5, - } - break - except AssertionError: - if is_last_attempt: - raise + run_id = run.id + + async def get_run() -> ActorRun: + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) + + expected_counts = { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + get_run, + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts async def test_actor_push_data_combined_budget_limit( @@ -202,21 +199,25 @@ async def test_actor_push_data_combined_budget_limit( """ run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 2, - 'apify-default-dataset-item': 2, - } - break - except AssertionError: - if is_last_attempt: - raise + run_id = run.id + + async def get_run() -> ActorRun: + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) + + expected_counts = { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + get_run, + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index 5a5d1b92..75a80695 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -6,7 +6,7 @@ from apify_shared.consts import ApifyEnvVars -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storages import Dataset diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index 912ecfb0..2ea4672d 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -7,7 +7,7 @@ from apify_shared.consts import ApifyEnvVars from crawlee import service_locator -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index d1e77cdd..f70156f5 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -12,7 +12,7 @@ from crawlee import service_locator from crawlee.crawlers import BasicCrawler -from ._utils import call_with_exp_backoff, generate_unique_resource_name, poll_until_condition +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor, Request from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify import ApifyRequestQueueClient @@ -26,8 +26,8 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata # In shared mode, there is a propagation delay between operations, so we retry reads with the test helper -# `call_with_exp_backoff` (`max_retries=5`). In single mode reads are immediately consistent, so we call once -# (`max_retries=0`). See https://github.com/apify/apify-sdk-python/issues/808. +# `poll_until_condition` (`timeout=30`, exponential backoff). In single mode reads are immediately consistent, +# so we call once (`timeout=0`). See https://github.com/apify/apify-sdk-python/issues/808. async def test_add_and_fetch_requests( @@ -36,7 +36,7 @@ async def test_add_and_fetch_requests( ) -> None: """Test basic functionality of adding and fetching requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -48,7 +48,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -62,7 +62,7 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' @@ -72,7 +72,7 @@ async def test_add_requests_in_batches( ) -> None: """Test adding multiple requests in a single batch operation.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 rq = request_queue_apify @@ -84,7 +84,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -98,7 +98,7 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' @@ -108,7 +108,7 @@ async def test_add_non_unique_requests_in_batch( ) -> None: """Test adding requests with duplicate unique keys in batch.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 rq = request_queue_apify @@ -124,7 +124,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -139,7 +139,7 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' @@ -150,7 +150,7 @@ async def test_forefront_requests_ordering( ) -> None: """Test that forefront requests are processed before regular requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -169,7 +169,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -194,7 +194,7 @@ async def test_request_unique_key_behavior( ) -> None: """Test behavior of custom unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -224,7 +224,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -246,7 +246,7 @@ async def test_request_reclaim_functionality( ) -> None: """Test request reclaiming for failed processing.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -256,7 +256,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -269,7 +269,12 @@ async def test_request_reclaim_functionality( # Should be able to fetch the same request again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - request2 = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + request2 = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert request2 is not None assert request2.url == fetched_request.url @@ -278,7 +283,7 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True @@ -289,7 +294,7 @@ async def test_request_reclaim_with_forefront( """Test reclaiming requests to the front of the queue.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -301,7 +306,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + first_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -311,7 +316,12 @@ async def test_request_reclaim_with_forefront( # The reclaimed request should be fetched first again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - next_request = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + next_request = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert next_request is not None assert next_request.url == first_request.url @@ -335,7 +345,7 @@ async def test_complex_request_objects( ) -> None: """Test handling complex Request objects with various properties.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -352,7 +362,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -383,7 +393,7 @@ async def test_get_request_by_unique_key( ) -> None: """Test retrieving specific requests by their unique_key.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -394,9 +404,10 @@ async def test_get_request_by_unique_key( request_unique_key = add_result.unique_key Actor.log.info(f'Request added with unique_key: {request_unique_key}') - retrieved_request = await call_with_exp_backoff( + retrieved_request = await poll_until_condition( lambda: rq.get_request(request_unique_key), - max_retries=max_retries, + timeout=timeout, + backoff_factor=2, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}' @@ -415,7 +426,7 @@ async def test_metadata_tracking( ) -> None: """Test request queue metadata and counts.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -440,7 +451,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) @@ -460,7 +471,7 @@ async def test_batch_operations_performance( ) -> None: """Test batch operations vs individual operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -482,7 +493,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -491,7 +502,7 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' @@ -502,7 +513,7 @@ async def test_state_consistency( ) -> None: """Test queue state consistency during concurrent operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -520,7 +531,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -547,32 +558,32 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: """Test behavior with empty queues.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await call_with_exp_backoff(rq.is_empty, max_retries=max_retries) - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_empty = await poll_until_condition(rq.is_empty, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -595,7 +606,7 @@ async def test_large_batch_operations( ) -> None: """Test handling large batches of requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -615,14 +626,14 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' @@ -632,7 +643,7 @@ async def test_mixed_string_and_request_objects( ) -> None: """Test adding both string URLs and Request objects.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -661,7 +672,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -685,7 +696,7 @@ async def test_persistence_across_operations( ) -> None: """Test that queue state persists across different operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 # Open queue and add some requests rq = request_queue_apify @@ -702,7 +713,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -727,12 +738,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -764,7 +775,12 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu for url, expected_duplicate in urls_and_deduplication_expectations: # In shared mode, `add_request` may transiently return None until the operation propagates, # so poll with backoff until it returns a result. - result = await poll_until_condition(lambda url=url: rq.add_request(url), lambda result: result is not None) + result = await poll_until_condition( + lambda url=url: rq.add_request(url), + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert result is not None results.append(result.was_already_present) @@ -802,7 +818,7 @@ async def test_request_ordering_with_mixed_operations( ) -> None: """Test request ordering with mixed add/reclaim operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -813,7 +829,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + request1 = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -827,7 +843,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -880,7 +896,12 @@ async def test_request_queue_metadata_another_client( await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'})) # Poll until the API has propagated the metadata change. - metadata = await poll_until_condition(rq.get_metadata, lambda m: m.total_request_count >= 1) + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.total_request_count >= 1, + timeout=60, + poll_interval=5, + ) assert metadata.total_request_count == 1 @@ -981,6 +1002,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= expected_write_count, + timeout=60, + poll_interval=5, ) Actor.log.info(f'{metadata.stats=}') assert metadata.stats.write_count == expected_write_count @@ -1009,6 +1032,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_before = metadata.stats Actor.log.info(stats_before) @@ -1023,6 +1048,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.read_count - stats_before.read_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_after = metadata.stats Actor.log.info(stats_after) @@ -1050,6 +1077,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: apify_metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= add_request_count, + timeout=60, + poll_interval=5, ) assert hasattr(apify_metadata, 'stats') @@ -1062,7 +1091,7 @@ async def test_rq_long_url( ) -> None: """Test handling of requests with long URLs and extended unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1076,12 +1105,12 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) + is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) assert is_finished @@ -1094,7 +1123,7 @@ async def test_pre_existing_request_with_user_data( list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 custom_data = {'key': 'value'} rq = request_queue_apify @@ -1108,7 +1137,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1138,19 +1167,19 @@ async def test_request_queue_is_finished( request: pytest.FixtureRequest, ) -> None: rq_access_mode = request.node.callspec.params.get('request_queue_apify') - max_retries = 0 if rq_access_mode == 'single' else 5 + timeout = 0 if rq_access_mode == 'single' else 30 await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, max_retries=max_retries) + fetched = await poll_until_condition(request_queue_apify.fetch_next_request, timeout=timeout, backoff_factor=2) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await call_with_exp_backoff(request_queue_apify.is_finished, max_retries=max_retries) + assert await poll_until_condition(request_queue_apify.is_finished, timeout=timeout, backoff_factor=2) async def test_request_queue_deduplication_unprocessed_requests( @@ -1201,6 +1230,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) Actor.log.info(stats_after) @@ -1309,6 +1340,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 @@ -1341,6 +1374,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 2, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 2 @@ -1379,6 +1414,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= len(requests), + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests) @@ -1436,7 +1473,7 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=5): + while request := await poll_until_condition(rq.fetch_next_request, timeout=30, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(request) diff --git a/tests/unit/actor/test_actor_key_value_store.py b/tests/unit/actor/test_actor_key_value_store.py index 581d775d..bd4e69cb 100644 --- a/tests/unit/actor/test_actor_key_value_store.py +++ b/tests/unit/actor/test_actor_key_value_store.py @@ -1,12 +1,11 @@ from __future__ import annotations -import asyncio - import pytest from apify_shared.consts import ApifyEnvVars from crawlee._utils.file import json_dumps +from ..._utils import poll_until_condition from ..test_crypto import PRIVATE_KEY_PASSWORD, PRIVATE_KEY_PEM_BASE64, PUBLIC_KEY from apify import Actor from apify._consts import ENCRYPTED_JSON_VALUE_PREFIX, ENCRYPTED_STRING_VALUE_PREFIX @@ -119,10 +118,13 @@ async def test_use_state(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store() - stored_state = await kvs.get_value('APIFY_GLOBAL_STATE') + stored_state = await poll_until_condition( + lambda: kvs.get_value('APIFY_GLOBAL_STATE'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' @@ -142,10 +144,13 @@ async def test_use_state_non_default(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store(name='custom-kvs') - stored_state = await kvs.get_value('custom_state_key') + stored_state = await poll_until_condition( + lambda: kvs.get_value('custom_state_key'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 03fdd00e..c93b27a0 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -11,6 +11,7 @@ from apify_shared.consts import ActorExitCodes, ApifyEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Actor if TYPE_CHECKING: @@ -199,7 +200,8 @@ def on_event(event_type: Event) -> Callable: assert actor._active actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) actor.on(Event.SYSTEM_INFO, on_event(Event.SYSTEM_INFO)) - await asyncio.sleep(1) + # Wait until both periodic events are emitted at least once (the emit interval is 100 ms). + await poll_until_condition(lambda: bool(on_persist) and bool(on_system_info), poll_interval=0.05) on_persist_count = len(on_persist) on_system_info_count = len(on_system_info) diff --git a/tests/unit/events/test_apify_event_manager.py b/tests/unit/events/test_apify_event_manager.py index eb8dd375..33e14e87 100644 --- a/tests/unit/events/test_apify_event_manager.py +++ b/tests/unit/events/test_apify_event_manager.py @@ -17,6 +17,7 @@ from apify_shared.consts import ActorEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Configuration from apify.events import ApifyEventManager from apify.events._types import SystemInfoEventData @@ -91,7 +92,7 @@ def event_handler(data: Any) -> None: # Test adding the handler event_manager.on(event=Event.SYSTEM_INFO, listener=handler_system_info) event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: bool(event_calls[Event.SYSTEM_INFO]), poll_interval=0.05) assert event_calls[Event.SYSTEM_INFO] == [(None, dummy_system_info)] event_calls[Event.SYSTEM_INFO].clear() @@ -114,7 +115,7 @@ def event_handler(data: Any) -> None: # Test that they all work event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 3, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -125,7 +126,7 @@ def event_handler(data: Any) -> None: # Test that if you remove one, the others stay event_manager.off(event=Event.PERSIST_STATE, listener=handler_persist_state_3) event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 2, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -211,7 +212,7 @@ def listener(data: Any) -> None: # Test sending event with data await send_platform_event(Event.SYSTEM_INFO, dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls) == 1, poll_interval=0.05) assert len(event_calls) == 1 assert event_calls[0] is not None assert event_calls[0]['cpuInfo']['usedRatio'] == 0.0845549815498155 @@ -230,7 +231,8 @@ async def handler(_data: Any) -> None: persist_state_counter += 1 event_manager.on(event=Event.PERSIST_STATE, listener=handler) - await asyncio.sleep(1.5) + # Wait until at least one PERSIST_STATE event is handled (the persist interval is 500 ms). + await poll_until_condition(lambda: persist_state_counter > 0, poll_interval=0.05) first_count = persist_state_counter assert first_count > 0 @@ -275,7 +277,7 @@ async def test_unknown_event_is_logged(monkeypatch: pytest.MonkeyPatch, caplog: # Send an unknown event unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}}) websockets.broadcast(connected_ws_clients, unknown_message) - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Unknown message received' in caplog.text, poll_interval=0.05) assert 'Unknown message received' in caplog.text assert 'totallyNewEvent2099' in caplog.text @@ -307,7 +309,10 @@ def migrating_listener(data: Any) -> None: # Send migrating event migrating_message = json.dumps({'name': 'migrating'}) websockets.broadcast(connected_ws_clients, migrating_message) - await asyncio.sleep(0.2) + await poll_until_condition( + lambda: bool(migrating_calls) and any(getattr(c, 'is_migrating', False) for c in persist_calls), + poll_interval=0.05, + ) assert len(migrating_calls) == 1 # MIGRATING should also trigger a PERSIST_STATE with is_migrating=True @@ -361,6 +366,6 @@ async def test_malformed_message_logs_exception( # Send malformed message websockets.broadcast(connected_ws_clients, 'this is not valid json{{{') - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Cannot parse Actor event' in caplog.text, poll_interval=0.05) assert 'Cannot parse Actor event' in caplog.text From a7f4aaad985063100a851b1f971e5b4fef35588e Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 13:32:56 +0200 Subject: [PATCH 3/4] test: address review findings (centralize RQ poll timeout, dedup get_run, bound webhook wait) - Add an rq_poll_timeout fixture deriving the single/shared polling timeout from the request_queue_apify param, replacing the derivation block that was copy-pasted into all 20 request queue tests. - Extract the byte-identical get_run closure in test_actor_charge.py into a module-level _get_run helper used via functools.partial. - Bound the webhook server Actor's wait loop (5s server timeout, 300s deadline) so a webhook that never propagates fails the test quickly with an empty WEBHOOK_BODY and a clear message instead of blocking until the run timeout. --- tests/e2e/test_actor_api_helpers.py | 10 +- tests/e2e/test_actor_charge.py | 32 ++--- tests/integration/conftest.py | 12 ++ tests/integration/test_request_queue.py | 158 +++++++++--------------- 4 files changed, 90 insertions(+), 122 deletions(-) diff --git a/tests/e2e/test_actor_api_helpers.py b/tests/e2e/test_actor_api_helpers.py index 5a43cd84..b978e4d3 100644 --- a/tests/e2e/test_actor_api_helpers.py +++ b/tests/e2e/test_actor_api_helpers.py @@ -393,6 +393,7 @@ async def test_actor_adds_webhook_and_receives_event( ) -> None: async def main_server() -> None: import os + import time from http.server import BaseHTTPRequestHandler, HTTPServer from apify_shared.consts import ActorEnvVars @@ -419,7 +420,12 @@ def do_POST(self) -> None: container_port = int(os.getenv(ActorEnvVars.WEB_SERVER_PORT, '')) with HTTPServer(('', container_port), WebhookHandler) as server: await Actor.set_value('INITIALIZED', value=True) - while not webhook_body: + # Bound the wait so that a webhook that never fires (e.g. one that did not propagate before the + # client run finished) surfaces as an empty WEBHOOK_BODY in the test instead of blocking here + # until the run times out. + server.timeout = 5 + deadline = time.monotonic() + 300 + while not webhook_body and time.monotonic() < deadline: server.handle_request() await Actor.set_value('WEBHOOK_BODY', webhook_body) @@ -478,7 +484,7 @@ async def main_client() -> None: webhook_body_record = await server_actor.last_run().key_value_store().get_record('WEBHOOK_BODY') assert webhook_body_record is not None - assert webhook_body_record['value'] != '' + assert webhook_body_record['value'] != '', 'The ad-hoc webhook never fired (it likely did not propagate in time).' parsed_webhook_body = json.loads(webhook_body_record['value']) assert parsed_webhook_body['eventData']['actorId'] == ac_run_result.act_id diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index 1b678409..649d141d 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -1,6 +1,7 @@ from __future__ import annotations from decimal import Decimal +from functools import partial from typing import TYPE_CHECKING import pytest_asyncio @@ -18,6 +19,11 @@ from .conftest import MakeActorFunction, RunActorFunction +async def _get_run(apify_client_async: ApifyClientAsync, run_id: str) -> ActorRun: + """Fetch the current state of the given run from the platform.""" + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -116,14 +122,10 @@ async def test_actor_charge_basic( apify_client_async: ApifyClientAsync, ) -> None: run = await run_actor(ppe_actor) - run_id = run.id - - async def get_run() -> ActorRun: - return ActorRun.model_validate(await apify_client_async.run(run_id).get()) # Refetch until the charged event counts propagate on the platform. run = await poll_until_condition( - get_run, + partial(_get_run, apify_client_async, run.id), lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 4}, timeout=30, poll_interval=1, @@ -139,14 +141,10 @@ async def test_actor_charge_limit( apify_client_async: ApifyClientAsync, ) -> None: run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) - run_id = run.id - - async def get_run() -> ActorRun: - return ActorRun.model_validate(await apify_client_async.run(run_id).get()) # Refetch until the charged event counts propagate on the platform. run = await poll_until_condition( - get_run, + partial(_get_run, apify_client_async, run.id), lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 2}, timeout=30, poll_interval=1, @@ -164,11 +162,6 @@ async def test_actor_push_data_charges_both_events( """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" run = await run_actor(ppe_push_data_actor) - run_id = run.id - - async def get_run() -> ActorRun: - return ActorRun.model_validate(await apify_client_async.run(run_id).get()) - expected_counts = { 'push-item': 5, 'apify-default-dataset-item': 5, @@ -178,7 +171,7 @@ async def get_run() -> ActorRun: # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are # reflected immediately via the charge endpoint). run = await poll_until_condition( - get_run, + partial(_get_run, apify_client_async, run.id), lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, timeout=120, poll_interval=1, @@ -199,11 +192,6 @@ async def test_actor_push_data_combined_budget_limit( """ run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) - run_id = run.id - - async def get_run() -> ActorRun: - return ActorRun.model_validate(await apify_client_async.run(run_id).get()) - expected_counts = { 'push-item': 2, 'apify-default-dataset-item': 2, @@ -213,7 +201,7 @@ async def get_run() -> ActorRun: # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are # reflected immediately via the charge endpoint). run = await poll_until_condition( - get_run, + partial(_get_run, apify_client_async, run.id), lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, timeout=120, poll_interval=1, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 30aa077d..7409e50c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -94,6 +94,18 @@ async def request_queue_apify( await rq.drop() +@pytest.fixture +def rq_poll_timeout(request: pytest.FixtureRequest) -> int: + """Return the `poll_until_condition` timeout matching the `request_queue_apify` access mode. + + In single mode, reads are immediately consistent, so the caller should poll exactly once (`timeout=0`). In + shared mode, there is a propagation delay between operations, so reads are retried for up to 30 seconds. + See https://github.com/apify/apify-sdk-python/issues/808. + """ + rq_access_mode = request.node.callspec.params.get('request_queue_apify') + return 0 if rq_access_mode == 'single' else 30 + + @pytest.fixture(autouse=True) def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None: """Isolate the testing environment by resetting global state before each test. diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index f70156f5..ce4fc9c7 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -26,17 +26,16 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata # In shared mode, there is a propagation delay between operations, so we retry reads with the test helper -# `poll_until_condition` (`timeout=30`, exponential backoff). In single mode reads are immediately consistent, -# so we call once (`timeout=0`). See https://github.com/apify/apify-sdk-python/issues/808. +# `poll_until_condition` (exponential backoff). In single mode reads are immediately consistent, so we call once. +# The mode-appropriate timeout is provided by the `rq_poll_timeout` fixture (see conftest). +# See https://github.com/apify/apify-sdk-python/issues/808. async def test_add_and_fetch_requests( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test basic functionality of adding and fetching requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -48,7 +47,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -62,17 +61,15 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_requests_in_batches( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding multiple requests in a single batch operation.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 rq = request_queue_apify @@ -84,7 +81,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -98,17 +95,15 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_non_unique_requests_in_batch( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding requests with duplicate unique keys in batch.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 desired_request_count = 100 rq = request_queue_apify @@ -124,7 +119,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -139,18 +134,16 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' async def test_forefront_requests_ordering( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that forefront requests are processed before regular requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -169,7 +162,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -190,11 +183,9 @@ async def test_forefront_requests_ordering( async def test_request_unique_key_behavior( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test behavior of custom unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -224,7 +215,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -242,11 +233,9 @@ async def test_request_unique_key_behavior( async def test_request_reclaim_functionality( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request reclaiming for failed processing.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -256,7 +245,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -283,19 +272,16 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True async def test_request_reclaim_with_forefront( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test reclaiming requests to the front of the queue.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 - rq = request_queue_apify Actor.log.info('Request queue opened') @@ -306,7 +292,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + first_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -341,11 +327,9 @@ async def test_request_reclaim_with_forefront( async def test_complex_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling complex Request objects with various properties.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -362,7 +346,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -389,11 +373,9 @@ async def test_complex_request_objects( async def test_get_request_by_unique_key( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test retrieving specific requests by their unique_key.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -406,7 +388,7 @@ async def test_get_request_by_unique_key( retrieved_request = await poll_until_condition( lambda: rq.get_request(request_unique_key), - timeout=timeout, + timeout=rq_poll_timeout, backoff_factor=2, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' @@ -422,11 +404,9 @@ async def test_get_request_by_unique_key( async def test_metadata_tracking( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request queue metadata and counts.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -451,7 +431,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) @@ -467,11 +447,9 @@ async def test_metadata_tracking( async def test_batch_operations_performance( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test batch operations vs individual operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -493,7 +471,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -502,18 +480,16 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_state_consistency( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test queue state consistency during concurrent operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -531,7 +507,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -558,32 +534,30 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' -async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: +async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None: """Test behavior with empty queues.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await poll_until_condition(rq.is_empty, timeout=timeout, backoff_factor=2) - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -602,11 +576,9 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pyt async def test_large_batch_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling large batches of requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -626,24 +598,22 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_mixed_string_and_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding both string URLs and Request objects.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -672,7 +642,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -692,11 +662,9 @@ async def test_mixed_string_and_request_objects( async def test_persistence_across_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that queue state persists across different operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 # Open queue and add some requests rq = request_queue_apify @@ -713,7 +681,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -738,12 +706,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -814,11 +782,9 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu async def test_request_ordering_with_mixed_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request ordering with mixed add/reclaim operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -829,7 +795,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + request1 = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -843,7 +809,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -1087,11 +1053,9 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: async def test_rq_long_url( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling of requests with long URLs and extended unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1105,25 +1069,23 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await poll_until_condition(rq.is_finished, timeout=timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished async def test_pre_existing_request_with_user_data( request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that pre-existing requests with user data are fully fetched. list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 custom_data = {'key': 'value'} rq = request_queue_apify @@ -1137,7 +1099,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=timeout, backoff_factor=2) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1164,22 +1126,22 @@ async def test_force_cloud( async def test_request_queue_is_finished( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - timeout = 0 if rq_access_mode == 'single' else 30 await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await poll_until_condition(request_queue_apify.fetch_next_request, timeout=timeout, backoff_factor=2) + fetched = await poll_until_condition( + request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2 + ) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await poll_until_condition(request_queue_apify.is_finished, timeout=timeout, backoff_factor=2) + assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2) async def test_request_queue_deduplication_unprocessed_requests( From 32f7c7ca64d5d1217b4b6b84b92ea7b024676a3c Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Wed, 3 Jun 2026 13:35:22 +0200 Subject: [PATCH 4/4] test: make maybe_await public for reuse --- tests/_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/_utils.py b/tests/_utils.py index b7a6f462..e44ba963 100644 --- a/tests/_utils.py +++ b/tests/_utils.py @@ -13,7 +13,7 @@ T = TypeVar('T') -async def _maybe_await(value: Awaitable[T] | T) -> T: +async def maybe_await(value: Awaitable[T] | T) -> T: """Await `value` if it is awaitable, otherwise return it unchanged. Lets `poll_until_condition` accept both sync and async callables. @@ -62,14 +62,14 @@ async def poll_until_condition( """ deadline = time.monotonic() + timeout delay = poll_interval - result = await _maybe_await(fn()) + result = await maybe_await(fn()) while not condition(result): remaining = deadline - time.monotonic() if remaining <= 0: break await asyncio.sleep(min(delay, remaining)) delay *= backoff_factor - result = await _maybe_await(fn()) + result = await maybe_await(fn()) return result