Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/__init__.py
Empty file.
87 changes: 87 additions & 0 deletions tests/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import asyncio
import inspect
import time
from typing import TYPE_CHECKING, TypeVar, cast, overload

from crawlee._utils.crypto import crypto_random_object_id

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable

T = TypeVar('T')


async def maybe_await(value: Awaitable[T] | T) -> T:
"""Await `value` if it is awaitable, otherwise return it unchanged.

Lets `poll_until_condition` accept both sync and async callables.
"""
if inspect.isawaitable(value):
return await cast('Awaitable[T]', value)
return cast('T', value)


@overload
async def poll_until_condition(
fn: Callable[[], Awaitable[T]],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
backoff_factor: float = ...,
) -> T: ...
@overload
async def poll_until_condition(
fn: Callable[[], T],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
backoff_factor: float = ...,
) -> T: ...
async def poll_until_condition(
fn: Callable[[], Awaitable[T] | T],
condition: Callable[[T], bool] = bool,
*,
timeout: float = 5,
poll_interval: float = 1,
backoff_factor: float = 1,
) -> T:
"""Poll `fn` until `condition(result)` is True or the timeout expires.

Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed.
Returns the last polled result regardless of whether the condition was met, so the caller can run its own
assertion. The default condition checks for a truthy result. Pass `timeout=0` to call `fn` exactly once.

Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly
created resource appearing in a listing) that may take a variable amount of time to propagate. For highly
variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the
interval after each poll, covering a long timeout with few calls.
"""
deadline = time.monotonic() + timeout
delay = poll_interval
result = await maybe_await(fn())
while not condition(result):
remaining = deadline - time.monotonic()
if remaining <= 0:
break
await asyncio.sleep(min(delay, remaining))
delay *= backoff_factor
result = await maybe_await(fn())
return result


def generate_unique_resource_name(label: str) -> str:
"""Generates a unique resource name, which will contain the given label."""
name_template = 'python-sdk-tests-{}-generated-{}'
template_length = len(name_template.format('', ''))
api_name_limit = 63
generated_random_id_length = 8
label_length_limit = api_name_limit - template_length - generated_random_id_length

label = label.replace('_', '-')
assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}'

return name_template.format(label, crypto_random_object_id(generated_random_id_length))
17 changes: 0 additions & 17 deletions tests/e2e/_utils.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from crawlee import service_locator

import apify._actor
from ._utils import generate_unique_resource_name
from .._utils import generate_unique_resource_name
from apify._models import ActorRun
from apify.storage_clients._apify._alias_resolving import AliasResolver

Expand Down
33 changes: 26 additions & 7 deletions tests/e2e/test_actor_api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from apify_shared.consts import ActorPermissionLevel
from crawlee._utils.crypto import crypto_random_object_id

from ._utils import generate_unique_resource_name
from .._utils import generate_unique_resource_name, poll_until_condition
from apify import Actor
from apify._models import ActorRun

Expand Down Expand Up @@ -393,6 +393,7 @@ async def test_actor_adds_webhook_and_receives_event(
) -> None:
async def main_server() -> None:
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer

from apify_shared.consts import ActorEnvVars
Expand All @@ -419,12 +420,19 @@ def do_POST(self) -> None:
container_port = int(os.getenv(ActorEnvVars.WEB_SERVER_PORT, ''))
with HTTPServer(('', container_port), WebhookHandler) as server:
await Actor.set_value('INITIALIZED', value=True)
while not webhook_body:
# Bound the wait so that a webhook that never fires (e.g. one that did not propagate before the
# client run finished) surfaces as an empty WEBHOOK_BODY in the test instead of blocking here
# until the run times out.
server.timeout = 5
deadline = time.monotonic() + 300
while not webhook_body and time.monotonic() < deadline:
server.handle_request()

await Actor.set_value('WEBHOOK_BODY', webhook_body)

async def main_client() -> None:
import asyncio

from apify import Webhook, WebhookEventType

async with Actor:
Expand All @@ -438,6 +446,12 @@ async def main_client() -> None:
)
)

# Keep the run alive for a moment after registering the webhook. Without this, the run finishes
# just milliseconds later and the platform may process the run-succeeded event before the freshly
# added ad-hoc webhook has propagated, in which case the webhook never fires and the server Actor
# waits until it times out.
await asyncio.sleep(5)

server_actor, client_actor = await asyncio.gather(
make_actor(label='add-webhook-server', main_func=main_server),
make_actor(label='add-webhook-client', main_func=main_client),
Expand All @@ -446,10 +460,15 @@ async def main_client() -> None:
server_actor_run = await server_actor.start()
server_actor_container_url = server_actor_run['containerUrl']

server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED')
while not server_actor_initialized:
server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED')
await asyncio.sleep(1)
# Wait for the server Actor's container to start up and bind its HTTP server. The startup time is highly
# variable (image pull, container creation), so poll with a growing interval instead of a fixed sleep.
server_actor_initialized = await poll_until_condition(
lambda: server_actor.last_run().key_value_store().get_record('INITIALIZED'),
timeout=300,
poll_interval=1,
backoff_factor=1.5,
)
assert server_actor_initialized is not None, 'The server Actor did not initialize in time.'

ac_run_result = await run_actor(
client_actor,
Expand All @@ -465,7 +484,7 @@ async def main_client() -> None:

webhook_body_record = await server_actor.last_run().key_value_store().get_record('WEBHOOK_BODY')
assert webhook_body_record is not None
assert webhook_body_record['value'] != ''
assert webhook_body_record['value'] != '', 'The ad-hoc webhook never fired (it likely did not propagate in time).'
parsed_webhook_body = json.loads(webhook_body_record['value'])

assert parsed_webhook_body['eventData']['actorId'] == ac_run_result.act_id
Expand Down
129 changes: 59 additions & 70 deletions tests/e2e/test_actor_charge.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
from __future__ import annotations

import asyncio
from decimal import Decimal
from functools import partial
from typing import TYPE_CHECKING

import pytest_asyncio

from apify_shared.consts import ActorJobStatus

from .._utils import poll_until_condition
from apify import Actor
from apify._models import ActorRun

if TYPE_CHECKING:
from collections.abc import Iterable

from apify_client import ApifyClientAsync
from apify_client.clients import ActorClientAsync

from .conftest import MakeActorFunction, RunActorFunction


async def _get_run(apify_client_async: ApifyClientAsync, run_id: str) -> ActorRun:
"""Fetch the current state of the given run from the platform."""
return ActorRun.model_validate(await apify_client_async.run(run_id).get())


@pytest_asyncio.fixture(scope='module', loop_scope='module')
async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str:
async def main() -> None:
Expand Down Expand Up @@ -112,33 +116,23 @@ async def ppe_actor(
return apify_client_async.actor(ppe_actor_build)


def retry_counter(total_attempts: int) -> Iterable[tuple[bool, int]]:
for retry in range(total_attempts - 1):
yield False, retry

yield True, total_attempts - 1


async def test_actor_charge_basic(
ppe_actor: ActorClientAsync,
run_actor: RunActorFunction,
apify_client_async: ApifyClientAsync,
) -> None:
run = await run_actor(ppe_actor)

# Refetch until the platform gets its act together
for is_last_attempt, _ in retry_counter(30):
await asyncio.sleep(1)
updated_run = await apify_client_async.run(run.id).get()
run = ActorRun.model_validate(updated_run)
# Refetch until the charged event counts propagate on the platform.
run = await poll_until_condition(
partial(_get_run, apify_client_async, run.id),
lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 4},
timeout=30,
poll_interval=1,
)

try:
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {'foobar': 4}
break
except AssertionError:
if is_last_attempt:
raise
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {'foobar': 4}


async def test_actor_charge_limit(
Expand All @@ -148,19 +142,16 @@ async def test_actor_charge_limit(
) -> None:
run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2'))

# Refetch until the platform gets its act together
for is_last_attempt, _ in retry_counter(30):
await asyncio.sleep(1)
updated_run = await apify_client_async.run(run.id).get()
run = ActorRun.model_validate(updated_run)
# Refetch until the charged event counts propagate on the platform.
run = await poll_until_condition(
partial(_get_run, apify_client_async, run.id),
lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 2},
timeout=30,
poll_interval=1,
)

try:
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {'foobar': 2}
break
except AssertionError:
if is_last_attempt:
raise
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {'foobar': 2}


async def test_actor_push_data_charges_both_events(
Expand All @@ -171,24 +162,23 @@ async def test_actor_push_data_charges_both_events(
"""Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event."""
run = await run_actor(ppe_push_data_actor)

# Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`:
# the platform computes them from dataset writes asynchronously, so they propagate more slowly than
# explicit charges (which are reflected immediately via the charge endpoint).
for is_last_attempt, _ in retry_counter(120):
await asyncio.sleep(1)
updated_run = await apify_client_async.run(run.id).get()
run = ActorRun.model_validate(updated_run)

try:
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {
'push-item': 5,
'apify-default-dataset-item': 5,
}
break
except AssertionError:
if is_last_attempt:
raise
expected_counts = {
'push-item': 5,
'apify-default-dataset-item': 5,
}

# Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them
# from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are
# reflected immediately via the charge endpoint).
run = await poll_until_condition(
partial(_get_run, apify_client_async, run.id),
lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts,
timeout=120,
poll_interval=1,
)

assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == expected_counts


async def test_actor_push_data_combined_budget_limit(
Expand All @@ -202,21 +192,20 @@ async def test_actor_push_data_combined_budget_limit(
"""
run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20'))

# Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`:
# the platform computes them from dataset writes asynchronously, so they propagate more slowly than
# explicit charges (which are reflected immediately via the charge endpoint).
for is_last_attempt, _ in retry_counter(120):
await asyncio.sleep(1)
updated_run = await apify_client_async.run(run.id).get()
run = ActorRun.model_validate(updated_run)

try:
assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == {
'push-item': 2,
'apify-default-dataset-item': 2,
}
break
except AssertionError:
if is_last_attempt:
raise
expected_counts = {
'push-item': 2,
'apify-default-dataset-item': 2,
}

# Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them
# from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are
# reflected immediately via the charge endpoint).
run = await poll_until_condition(
partial(_get_run, apify_client_async, run.id),
lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts,
timeout=120,
poll_interval=1,
)

assert run.status == ActorJobStatus.SUCCEEDED
assert run.charged_event_counts == expected_counts
Loading