diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/_utils.py b/tests/_utils.py new file mode 100644 index 00000000..e44ba963 --- /dev/null +++ b/tests/_utils.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import asyncio +import inspect +import time +from typing import TYPE_CHECKING, TypeVar, cast, overload + +from crawlee._utils.crypto import crypto_random_object_id + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + +T = TypeVar('T') + + +async def maybe_await(value: Awaitable[T] | T) -> T: + """Await `value` if it is awaitable, otherwise return it unchanged. + + Lets `poll_until_condition` accept both sync and async callables. + """ + if inspect.isawaitable(value): + return await cast('Awaitable[T]', value) + return cast('T', value) + + +@overload +async def poll_until_condition( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., + backoff_factor: float = ..., +) -> T: ... +@overload +async def poll_until_condition( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., + backoff_factor: float = ..., +) -> T: ... +async def poll_until_condition( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + timeout: float = 5, + poll_interval: float = 1, + backoff_factor: float = 1, +) -> T: + """Poll `fn` until `condition(result)` is True or the timeout expires. + + Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. + Returns the last polled result regardless of whether the condition was met, so the caller can run its own + assertion. The default condition checks for a truthy result. Pass `timeout=0` to call `fn` exactly once. + + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly + created resource appearing in a listing) that may take a variable amount of time to propagate. For highly + variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the + interval after each poll, covering a long timeout with few calls. + """ + deadline = time.monotonic() + timeout + delay = poll_interval + result = await maybe_await(fn()) + while not condition(result): + remaining = deadline - time.monotonic() + if remaining <= 0: + break + await asyncio.sleep(min(delay, remaining)) + delay *= backoff_factor + result = await maybe_await(fn()) + return result + + +def generate_unique_resource_name(label: str) -> str: + """Generates a unique resource name, which will contain the given label.""" + name_template = 'python-sdk-tests-{}-generated-{}' + template_length = len(name_template.format('', '')) + api_name_limit = 63 + generated_random_id_length = 8 + label_length_limit = api_name_limit - template_length - generated_random_id_length + + label = label.replace('_', '-') + assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' + + return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/e2e/_utils.py b/tests/e2e/_utils.py deleted file mode 100644 index b5323272..00000000 --- a/tests/e2e/_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import annotations - -from crawlee._utils.crypto import crypto_random_object_id - - -def generate_unique_resource_name(label: str) -> str: - """Generates a unique resource name, which will contain the given label.""" - name_template = 'python-sdk-tests-{}-generated-{}' - template_length = len(name_template.format('', '')) - api_name_limit = 63 - generated_random_id_length = 8 - label_length_limit = api_name_limit - template_length - generated_random_id_length - - label = label.replace('_', '-') - assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' - - return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index bdcc9883..df44a233 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -17,7 +17,7 @@ from crawlee import service_locator import apify._actor -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify._models import ActorRun from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/e2e/test_actor_api_helpers.py b/tests/e2e/test_actor_api_helpers.py index 3747dd3b..b978e4d3 100644 --- a/tests/e2e/test_actor_api_helpers.py +++ b/tests/e2e/test_actor_api_helpers.py @@ -7,7 +7,7 @@ from apify_shared.consts import ActorPermissionLevel from crawlee._utils.crypto import crypto_random_object_id -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor from apify._models import ActorRun @@ -393,6 +393,7 @@ async def test_actor_adds_webhook_and_receives_event( ) -> None: async def main_server() -> None: import os + import time from http.server import BaseHTTPRequestHandler, HTTPServer from apify_shared.consts import ActorEnvVars @@ -419,12 +420,19 @@ def do_POST(self) -> None: container_port = int(os.getenv(ActorEnvVars.WEB_SERVER_PORT, '')) with HTTPServer(('', container_port), WebhookHandler) as server: await Actor.set_value('INITIALIZED', value=True) - while not webhook_body: + # Bound the wait so that a webhook that never fires (e.g. one that did not propagate before the + # client run finished) surfaces as an empty WEBHOOK_BODY in the test instead of blocking here + # until the run times out. + server.timeout = 5 + deadline = time.monotonic() + 300 + while not webhook_body and time.monotonic() < deadline: server.handle_request() await Actor.set_value('WEBHOOK_BODY', webhook_body) async def main_client() -> None: + import asyncio + from apify import Webhook, WebhookEventType async with Actor: @@ -438,6 +446,12 @@ async def main_client() -> None: ) ) + # Keep the run alive for a moment after registering the webhook. Without this, the run finishes + # just milliseconds later and the platform may process the run-succeeded event before the freshly + # added ad-hoc webhook has propagated, in which case the webhook never fires and the server Actor + # waits until it times out. + await asyncio.sleep(5) + server_actor, client_actor = await asyncio.gather( make_actor(label='add-webhook-server', main_func=main_server), make_actor(label='add-webhook-client', main_func=main_client), @@ -446,10 +460,15 @@ async def main_client() -> None: server_actor_run = await server_actor.start() server_actor_container_url = server_actor_run['containerUrl'] - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - while not server_actor_initialized: - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - await asyncio.sleep(1) + # Wait for the server Actor's container to start up and bind its HTTP server. The startup time is highly + # variable (image pull, container creation), so poll with a growing interval instead of a fixed sleep. + server_actor_initialized = await poll_until_condition( + lambda: server_actor.last_run().key_value_store().get_record('INITIALIZED'), + timeout=300, + poll_interval=1, + backoff_factor=1.5, + ) + assert server_actor_initialized is not None, 'The server Actor did not initialize in time.' ac_run_result = await run_actor( client_actor, @@ -465,7 +484,7 @@ async def main_client() -> None: webhook_body_record = await server_actor.last_run().key_value_store().get_record('WEBHOOK_BODY') assert webhook_body_record is not None - assert webhook_body_record['value'] != '' + assert webhook_body_record['value'] != '', 'The ad-hoc webhook never fired (it likely did not propagate in time).' parsed_webhook_body = json.loads(webhook_body_record['value']) assert parsed_webhook_body['eventData']['actorId'] == ac_run_result.act_id diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index f8b1f393..649d141d 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -1,25 +1,29 @@ from __future__ import annotations -import asyncio from decimal import Decimal +from functools import partial from typing import TYPE_CHECKING import pytest_asyncio from apify_shared.consts import ActorJobStatus +from .._utils import poll_until_condition from apify import Actor from apify._models import ActorRun if TYPE_CHECKING: - from collections.abc import Iterable - from apify_client import ApifyClientAsync from apify_client.clients import ActorClientAsync from .conftest import MakeActorFunction, RunActorFunction +async def _get_run(apify_client_async: ApifyClientAsync, run_id: str) -> ActorRun: + """Fetch the current state of the given run from the platform.""" + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -112,13 +116,6 @@ async def ppe_actor( return apify_client_async.actor(ppe_actor_build) -def retry_counter(total_attempts: int) -> Iterable[tuple[bool, int]]: - for retry in range(total_attempts - 1): - yield False, retry - - yield True, total_attempts - 1 - - async def test_actor_charge_basic( ppe_actor: ActorClientAsync, run_actor: RunActorFunction, @@ -126,19 +123,16 @@ async def test_actor_charge_basic( ) -> None: run = await run_actor(ppe_actor) - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 4}, + timeout=30, + poll_interval=1, + ) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 4} - break - except AssertionError: - if is_last_attempt: - raise + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 4} async def test_actor_charge_limit( @@ -148,19 +142,16 @@ async def test_actor_charge_limit( ) -> None: run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 2}, + timeout=30, + poll_interval=1, + ) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 2} - break - except AssertionError: - if is_last_attempt: - raise + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 2} async def test_actor_push_data_charges_both_events( @@ -171,24 +162,23 @@ async def test_actor_push_data_charges_both_events( """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" run = await run_actor(ppe_push_data_actor) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 5, - 'apify-default-dataset-item': 5, - } - break - except AssertionError: - if is_last_attempt: - raise + expected_counts = { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts async def test_actor_push_data_combined_budget_limit( @@ -202,21 +192,20 @@ async def test_actor_push_data_combined_budget_limit( """ run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 2, - 'apify-default-dataset-item': 2, - } - break - except AssertionError: - if is_last_attempt: - raise + expected_counts = { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py deleted file mode 100644 index 3fb26bbd..00000000 --- a/tests/integration/_utils.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -import asyncio -import time -from typing import TYPE_CHECKING, Literal, TypeVar - -from crawlee._utils.crypto import crypto_random_object_id - -from apify import Actor - -if TYPE_CHECKING: - from collections.abc import Awaitable, Callable - -T = TypeVar('T') - - -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - *, - rq_access_mode: Literal['single', 'shared'], - max_retries: int = 5, -) -> T | None: - """Call an async callable with exponential backoff retries until it returns a truthy value. - - In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests - become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with - exponential backoff to handle that delay in integration tests. - - When `rq_access_mode` is `'single'`, the function is called once without retries. - """ - if rq_access_mode == 'single': - return await fn() - - if rq_access_mode == 'shared': - result = None - - for attempt in range(max_retries): - result = await fn() - - if result: - return result - - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) - - return result - - raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') - - -async def poll_until_condition( - fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool], - *, - timeout: float = 60, - poll_interval: float = 5, -) -> T: - """Poll `fn` until `condition(result)` is True or the timeout expires. - - Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. - Returns the last polled result regardless of whether the condition was met. - - Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue - stats) that may take a variable amount of time to propagate. - """ - deadline = time.monotonic() + timeout - result = await fn() - while not condition(result): - remaining = deadline - time.monotonic() - if remaining <= 0: - break - await asyncio.sleep(min(poll_interval, remaining)) - result = await fn() - return result - - -def generate_unique_resource_name(label: str) -> str: - """Generates a unique resource name, which will contain the given label.""" - name_template = 'python-sdk-tests-{}-generated-{}' - template_length = len(name_template.format('', '')) - api_name_limit = 63 - generated_random_id_length = 8 - label_length_limit = api_name_limit - template_length - generated_random_id_length - - label = label.replace('_', '-') - assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' - - return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 30aa077d..7409e50c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -94,6 +94,18 @@ async def request_queue_apify( await rq.drop() +@pytest.fixture +def rq_poll_timeout(request: pytest.FixtureRequest) -> int: + """Return the `poll_until_condition` timeout matching the `request_queue_apify` access mode. + + In single mode, reads are immediately consistent, so the caller should poll exactly once (`timeout=0`). In + shared mode, there is a propagation delay between operations, so reads are retried for up to 30 seconds. + See https://github.com/apify/apify-sdk-python/issues/808. + """ + rq_access_mode = request.node.callspec.params.get('request_queue_apify') + return 0 if rq_access_mode == 'single' else 30 + + @pytest.fixture(autouse=True) def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None: """Isolate the testing environment by resetting global state before each test. diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index 5a5d1b92..75a80695 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -6,7 +6,7 @@ from apify_shared.consts import ApifyEnvVars -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storages import Dataset diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index 912ecfb0..2ea4672d 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -7,7 +7,7 @@ from apify_shared.consts import ApifyEnvVars from crawlee import service_locator -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 87ae7e0b..ce4fc9c7 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -12,7 +12,7 @@ from crawlee import service_locator from crawlee.crawlers import BasicCrawler -from ._utils import call_with_exp_backoff, generate_unique_resource_name, poll_until_condition +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor, Request from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify import ApifyRequestQueueClient @@ -25,16 +25,17 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -# In shared mode, there is a propagation delay between operations so we use test helper -# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. +# In shared mode, there is a propagation delay between operations, so we retry reads with the test helper +# `poll_until_condition` (exponential backoff). In single mode reads are immediately consistent, so we call once. +# The mode-appropriate timeout is provided by the `rq_poll_timeout` fixture (see conftest). +# See https://github.com/apify/apify-sdk-python/issues/808. async def test_add_and_fetch_requests( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test basic functionality of adding and fetching requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -46,7 +47,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -60,16 +61,15 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_requests_in_batches( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding multiple requests in a single batch operation.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -81,7 +81,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -95,16 +95,15 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_non_unique_requests_in_batch( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding requests with duplicate unique keys in batch.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -120,7 +119,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -135,17 +134,16 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' async def test_forefront_requests_ordering( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that forefront requests are processed before regular requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -164,7 +162,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -185,10 +183,9 @@ async def test_forefront_requests_ordering( async def test_request_unique_key_behavior( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test behavior of custom unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -218,7 +215,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -236,10 +233,9 @@ async def test_request_unique_key_behavior( async def test_request_reclaim_functionality( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request reclaiming for failed processing.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -249,7 +245,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -262,7 +258,12 @@ async def test_request_reclaim_functionality( # Should be able to fetch the same request again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - request2 = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + request2 = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert request2 is not None assert request2.url == fetched_request.url @@ -271,18 +272,16 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True async def test_request_reclaim_with_forefront( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test reclaiming requests to the front of the queue.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - rq = request_queue_apify Actor.log.info('Request queue opened') @@ -293,7 +292,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + first_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -303,7 +302,12 @@ async def test_request_reclaim_with_forefront( # The reclaimed request should be fetched first again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - next_request = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + next_request = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert next_request is not None assert next_request.url == first_request.url @@ -323,10 +327,9 @@ async def test_request_reclaim_with_forefront( async def test_complex_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling complex Request objects with various properties.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -343,7 +346,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -370,10 +373,9 @@ async def test_complex_request_objects( async def test_get_request_by_unique_key( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test retrieving specific requests by their unique_key.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -384,9 +386,10 @@ async def test_get_request_by_unique_key( request_unique_key = add_result.unique_key Actor.log.info(f'Request added with unique_key: {request_unique_key}') - retrieved_request = await call_with_exp_backoff( + retrieved_request = await poll_until_condition( lambda: rq.get_request(request_unique_key), - rq_access_mode=rq_access_mode, + timeout=rq_poll_timeout, + backoff_factor=2, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}' @@ -401,10 +404,9 @@ async def test_get_request_by_unique_key( async def test_metadata_tracking( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request queue metadata and counts.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -429,7 +431,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) @@ -445,10 +447,9 @@ async def test_metadata_tracking( async def test_batch_operations_performance( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test batch operations vs individual operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -470,7 +471,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -479,17 +480,16 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_state_consistency( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test queue state consistency during concurrent operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -507,7 +507,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -534,31 +534,30 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' -async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: +async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None: """Test behavior with empty queues.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -577,10 +576,9 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pyt async def test_large_batch_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling large batches of requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -600,23 +598,22 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_mixed_string_and_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding both string URLs and Request objects.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -645,7 +642,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -665,10 +662,9 @@ async def test_mixed_string_and_request_objects( async def test_persistence_across_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that queue state persists across different operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') # Open queue and add some requests rq = request_queue_apify @@ -685,7 +681,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -710,12 +706,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -747,7 +743,12 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu for url, expected_duplicate in urls_and_deduplication_expectations: # In shared mode, `add_request` may transiently return None until the operation propagates, # so poll with backoff until it returns a result. - result = await poll_until_condition(lambda url=url: rq.add_request(url), lambda result: result is not None) + result = await poll_until_condition( + lambda url=url: rq.add_request(url), + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert result is not None results.append(result.was_already_present) @@ -781,10 +782,9 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu async def test_request_ordering_with_mixed_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request ordering with mixed add/reclaim operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -795,7 +795,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request1 = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -809,7 +809,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -862,7 +862,12 @@ async def test_request_queue_metadata_another_client( await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'})) # Poll until the API has propagated the metadata change. - metadata = await poll_until_condition(rq.get_metadata, lambda m: m.total_request_count >= 1) + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.total_request_count >= 1, + timeout=60, + poll_interval=5, + ) assert metadata.total_request_count == 1 @@ -963,6 +968,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= expected_write_count, + timeout=60, + poll_interval=5, ) Actor.log.info(f'{metadata.stats=}') assert metadata.stats.write_count == expected_write_count @@ -991,6 +998,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_before = metadata.stats Actor.log.info(stats_before) @@ -1005,6 +1014,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.read_count - stats_before.read_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_after = metadata.stats Actor.log.info(stats_after) @@ -1032,6 +1043,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: apify_metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= add_request_count, + timeout=60, + poll_interval=5, ) assert hasattr(apify_metadata, 'stats') @@ -1040,10 +1053,9 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: async def test_rq_long_url( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling of requests with long URLs and extended unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1057,24 +1069,23 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished async def test_pre_existing_request_with_user_data( request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that pre-existing requests with user data are fully fetched. list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') custom_data = {'key': 'value'} rq = request_queue_apify @@ -1088,7 +1099,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1115,21 +1126,22 @@ async def test_force_cloud( async def test_request_queue_is_finished( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: - rq_access_mode = request.node.callspec.params.get('request_queue_apify') await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) + fetched = await poll_until_condition( + request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2 + ) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) + assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2) async def test_request_queue_deduplication_unprocessed_requests( @@ -1180,6 +1192,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) Actor.log.info(stats_after) @@ -1288,6 +1302,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 @@ -1320,6 +1336,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 2, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 2 @@ -1358,6 +1376,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= len(requests), + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests) @@ -1415,7 +1435,7 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + while request := await poll_until_condition(rq.fetch_next_request, timeout=30, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(request) diff --git a/tests/unit/actor/test_actor_key_value_store.py b/tests/unit/actor/test_actor_key_value_store.py index 581d775d..bd4e69cb 100644 --- a/tests/unit/actor/test_actor_key_value_store.py +++ b/tests/unit/actor/test_actor_key_value_store.py @@ -1,12 +1,11 @@ from __future__ import annotations -import asyncio - import pytest from apify_shared.consts import ApifyEnvVars from crawlee._utils.file import json_dumps +from ..._utils import poll_until_condition from ..test_crypto import PRIVATE_KEY_PASSWORD, PRIVATE_KEY_PEM_BASE64, PUBLIC_KEY from apify import Actor from apify._consts import ENCRYPTED_JSON_VALUE_PREFIX, ENCRYPTED_STRING_VALUE_PREFIX @@ -119,10 +118,13 @@ async def test_use_state(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store() - stored_state = await kvs.get_value('APIFY_GLOBAL_STATE') + stored_state = await poll_until_condition( + lambda: kvs.get_value('APIFY_GLOBAL_STATE'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' @@ -142,10 +144,13 @@ async def test_use_state_non_default(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store(name='custom-kvs') - stored_state = await kvs.get_value('custom_state_key') + stored_state = await poll_until_condition( + lambda: kvs.get_value('custom_state_key'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 03fdd00e..c93b27a0 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -11,6 +11,7 @@ from apify_shared.consts import ActorExitCodes, ApifyEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Actor if TYPE_CHECKING: @@ -199,7 +200,8 @@ def on_event(event_type: Event) -> Callable: assert actor._active actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) actor.on(Event.SYSTEM_INFO, on_event(Event.SYSTEM_INFO)) - await asyncio.sleep(1) + # Wait until both periodic events are emitted at least once (the emit interval is 100 ms). + await poll_until_condition(lambda: bool(on_persist) and bool(on_system_info), poll_interval=0.05) on_persist_count = len(on_persist) on_system_info_count = len(on_system_info) diff --git a/tests/unit/events/test_apify_event_manager.py b/tests/unit/events/test_apify_event_manager.py index eb8dd375..33e14e87 100644 --- a/tests/unit/events/test_apify_event_manager.py +++ b/tests/unit/events/test_apify_event_manager.py @@ -17,6 +17,7 @@ from apify_shared.consts import ActorEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Configuration from apify.events import ApifyEventManager from apify.events._types import SystemInfoEventData @@ -91,7 +92,7 @@ def event_handler(data: Any) -> None: # Test adding the handler event_manager.on(event=Event.SYSTEM_INFO, listener=handler_system_info) event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: bool(event_calls[Event.SYSTEM_INFO]), poll_interval=0.05) assert event_calls[Event.SYSTEM_INFO] == [(None, dummy_system_info)] event_calls[Event.SYSTEM_INFO].clear() @@ -114,7 +115,7 @@ def event_handler(data: Any) -> None: # Test that they all work event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 3, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -125,7 +126,7 @@ def event_handler(data: Any) -> None: # Test that if you remove one, the others stay event_manager.off(event=Event.PERSIST_STATE, listener=handler_persist_state_3) event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 2, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -211,7 +212,7 @@ def listener(data: Any) -> None: # Test sending event with data await send_platform_event(Event.SYSTEM_INFO, dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls) == 1, poll_interval=0.05) assert len(event_calls) == 1 assert event_calls[0] is not None assert event_calls[0]['cpuInfo']['usedRatio'] == 0.0845549815498155 @@ -230,7 +231,8 @@ async def handler(_data: Any) -> None: persist_state_counter += 1 event_manager.on(event=Event.PERSIST_STATE, listener=handler) - await asyncio.sleep(1.5) + # Wait until at least one PERSIST_STATE event is handled (the persist interval is 500 ms). + await poll_until_condition(lambda: persist_state_counter > 0, poll_interval=0.05) first_count = persist_state_counter assert first_count > 0 @@ -275,7 +277,7 @@ async def test_unknown_event_is_logged(monkeypatch: pytest.MonkeyPatch, caplog: # Send an unknown event unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}}) websockets.broadcast(connected_ws_clients, unknown_message) - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Unknown message received' in caplog.text, poll_interval=0.05) assert 'Unknown message received' in caplog.text assert 'totallyNewEvent2099' in caplog.text @@ -307,7 +309,10 @@ def migrating_listener(data: Any) -> None: # Send migrating event migrating_message = json.dumps({'name': 'migrating'}) websockets.broadcast(connected_ws_clients, migrating_message) - await asyncio.sleep(0.2) + await poll_until_condition( + lambda: bool(migrating_calls) and any(getattr(c, 'is_migrating', False) for c in persist_calls), + poll_interval=0.05, + ) assert len(migrating_calls) == 1 # MIGRATING should also trigger a PERSIST_STATE with is_migrating=True @@ -361,6 +366,6 @@ async def test_malformed_message_logs_exception( # Send malformed message websockets.broadcast(connected_ws_clients, 'this is not valid json{{{') - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Cannot parse Actor event' in caplog.text, poll_interval=0.05) assert 'Cannot parse Actor event' in caplog.text