diff --git a/README.md b/README.md index b379ab7f0..078194440 100644 --- a/README.md +++ b/README.md @@ -522,7 +522,45 @@ async with sender: 9421 signing, and the httpx POST in one call. `send_raw(...)` is an escape hatch for custom payload shapes; dedicated methods exist for every webhook kind (`send_revocation_notification`, `send_artifact_webhook`, -`send_collection_list_changed`, `send_property_list_changed`). +`send_collection_list_changed`, `send_property_list_changed`, +`send_wholesale_feed`). + +Validate buyer-provided webhook URLs before storing durable subscriptions: + +```python +from adcp.webhooks import WebhookDestinationPolicy, validate_webhook_destination_url + +validate_webhook_destination_url( + request.push_notification_config.url, + field="push_notification_config.url", + policy=WebhookDestinationPolicy.production(), +) +``` + +Use `WebhookDestinationPolicy.local_development()` only for local tests that +need `http://localhost` or private-network destinations. Production validation +requires HTTPS and rejects loopback, private, link-local, reserved, and cloud +metadata destinations using the same SSRF classifier as `WebhookSender`. The +validation result includes both `original_url` and `effective_url`; sellers +should normally persist the buyer's original URL and reapply the same +policy/hooks at send time, rather than storing a Docker or test rewrite. + +Wholesale feed notifications use stable types from `adcp` / `adcp.types`: + +```python +from adcp import NotificationConfig, WholesaleFeedEvent, WholesaleFeedWebhook +from adcp.webhooks import WebhookSender + +if "product.updated" in subscription.event_types: + await sender.send_wholesale_feed_to_subscription( + subscription=subscription, + account_id=account_id, + notification_type="product.updated", + wholesale_feed_version=feed_version, + cache_scope="public", + event=event, + ) +``` The webhook-signing JWK MUST be published in your `adagents.json` with `adcp_use: "webhook-signing"` — distinct from your `request-signing` key so diff --git a/docs/handler-authoring.md b/docs/handler-authoring.md index 1c7ec8d78..48b602e9a 100644 --- a/docs/handler-authoring.md +++ b/docs/handler-authoring.md @@ -1214,6 +1214,55 @@ Pick one per `WebhookSender` instance. All three share the same | `WebhookSender.from_bearer_token(token)` | `Authorization: Bearer` | Simplest; no key management; requires TLS | | `WebhookSender.from_standard_webhooks_secret(secret, key_id=...)` | Standard Webhooks v1 | Svix / Resend / standardwebhooks.com receivers | +### Registration-time URL validation + +Validate durable buyer endpoints before persisting `push_notification_config.url` +or `accounts[].notification_configs[].url` from `sync_accounts`: + +```python +from adcp.webhooks import ( + WebhookDestinationPolicy, + WebhookDestinationValidationError, + validate_webhook_destination_url, +) + +try: + validate_webhook_destination_url( + config.url, + field="accounts[0].notification_configs[0].url", + policy=WebhookDestinationPolicy.production(), + ) +except WebhookDestinationValidationError as exc: + return {"errors": [exc.to_error()]} +``` + +Production policy requires HTTPS and rejects private, loopback, link-local, +reserved, and cloud metadata destinations. Use +`WebhookDestinationPolicy.local_development()` only for local fixtures that +need `http://localhost` or private-network endpoints. The helper returns both +`original_url` and `effective_url`; persist the buyer's original URL in durable +subscription state, and reapply the same policy/hooks when sending. Do not +persist a Docker or test rewrite as the buyer's registered endpoint. + +### Wholesale feed notifications + +`NotificationConfig`, `WholesaleFeedEvent`, and `WholesaleFeedWebhook` are +stable exports from both `adcp` and `adcp.types`. When firing account-scoped +catalog notifications, preserve the subscriber filter and send through +`WebhookSender`: + +```python +if event_type in subscription.event_types: + await sender.send_wholesale_feed_to_subscription( + subscription=subscription, + account_id=account_id, + notification_type=event_type, + wholesale_feed_version=feed_version, + cache_scope="public", + event=event, + ) +``` + ### Sender vs. supervisor `WebhookSender` is the transport layer — it constructs and signs one HTTP POST. diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 246901ae7..dd317755c 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -218,6 +218,7 @@ MediaBuyPackage, MediaBuyStatus, MediaChannel, + NotificationConfig, OfferingAssetConstraint, OfferingAssetGroup, # Optimization @@ -286,6 +287,8 @@ VerifyBrandClaimsRequestBulk, VerifyBrandClaimsResponseBulk, WcagLevel, + WholesaleFeedEvent, + WholesaleFeedWebhook, aliases, ) @@ -867,6 +870,9 @@ def get_adcp_version() -> str: "SignalPricingOption", # Configuration types "PushNotificationConfig", + "NotificationConfig", + "WholesaleFeedEvent", + "WholesaleFeedWebhook", # Adagents validation "AdAgentsValidationResult", "AdagentsCacheEntry", diff --git a/src/adcp/types/__init__.py b/src/adcp/types/__init__.py index 7e4ea7b05..f5254f105 100644 --- a/src/adcp/types/__init__.py +++ b/src/adcp/types/__init__.py @@ -251,6 +251,7 @@ MediaBuyStatus, MediaChannel, Metadata, + NotificationConfig, NotificationType, Offering, OfferingAssetConstraint, @@ -395,6 +396,8 @@ ViewThreshold, WcagLevel, WebhookResponseType, + WholesaleFeedEvent, + WholesaleFeedWebhook, ) from adcp.types._generated import ( AudioAsset as AudioContent, @@ -1128,6 +1131,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "AuthorizedAgents", "AvailableMetric", "PushNotificationConfig", + "NotificationConfig", "ReportingCapabilities", "ReportingFrequency", "ReportingPeriod", @@ -1162,6 +1166,8 @@ def __init__(self, *args: object, **kwargs: object) -> None: "WebhookMetadata", # Webhook types "McpWebhookPayload", + "WholesaleFeedEvent", + "WholesaleFeedWebhook", # Semantic aliases for discriminated unions "ActivateSignalErrorResponse", "ActivateSignalSuccessResponse", diff --git a/src/adcp/webhook_sender.py b/src/adcp/webhook_sender.py index 887615a66..ecd384cf5 100644 --- a/src/adcp/webhook_sender.py +++ b/src/adcp/webhook_sender.py @@ -29,9 +29,9 @@ import json import warnings -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -50,7 +50,14 @@ build_async_ip_pinned_transport, ) from adcp.signing.standard_webhooks import decode_secret as _decode_sw_secret -from adcp.types import AdcpProtocol, GeneratedTaskStatus, TaskType +from adcp.types import ( + AdcpProtocol, + GeneratedTaskStatus, + NotificationConfig, + TaskType, + WholesaleFeedEvent, + WholesaleFeedWebhook, +) from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData from adcp.webhook_auth import ( AdcpLegacyHmacStrategy, @@ -121,6 +128,24 @@ def _validate_hooks(hooks: tuple[TransportHook, ...], allow_private_destinations validate(allow_private_destinations=allow_private_destinations) +def _entity_type_for_wholesale_notification(notification_type: str) -> str: + if notification_type.startswith("product."): + return "product" + if notification_type.startswith("signal."): + return "signal" + if notification_type == "wholesale_feed.bulk_change": + return "feed" + raise ValueError( + f"unsupported wholesale feed notification_type {notification_type!r}; " + "expected product.*, signal.*, or wholesale_feed.bulk_change" + ) + + +def _enum_value(value: Any) -> str: + raw = getattr(value, "value", value) + return str(raw) + + @dataclass(frozen=True) class WebhookDeliveryResult: """Outcome of one ``send_*`` call. @@ -680,6 +705,137 @@ async def send_property_list_changed( url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers ) + async def send_wholesale_feed( + self, + *, + url: str, + subscriber_id: str, + account_id: str, + notification_type: str, + wholesale_feed_version: str, + cache_scope: str, + event: WholesaleFeedEvent | Mapping[str, Any], + previous_wholesale_feed_version: str | None = None, + fired_at: datetime | None = None, + idempotency_key: str | None = None, + subscription_event_types: Sequence[Any] | None = None, + extra_headers: Mapping[str, str] | None = None, + ) -> WebhookDeliveryResult: + """POST a signed account-scoped wholesale feed notification. + + ``subscription_event_types`` is optional but recommended when the + caller is sending to an ``accounts[].notification_configs[]`` entry: + pass that entry's ``event_types`` to fail closed if the subscription + did not request this notification type. + """ + + if not isinstance(subscriber_id, str) or not subscriber_id: + raise ValueError("subscriber_id must be a non-empty string") + if not isinstance(account_id, str) or not account_id: + raise ValueError("account_id must be a non-empty string") + if not isinstance(wholesale_feed_version, str) or not wholesale_feed_version: + raise ValueError("wholesale_feed_version must be a non-empty string") + + event_model = event + if not isinstance(event_model, WholesaleFeedEvent): + event_model = WholesaleFeedEvent.model_validate(event_model) + notification_type_value = _enum_value(notification_type) + event_type = _enum_value(event_model.event_type) + entity_type = _enum_value(event_model.entity_type) + if notification_type_value != event_type: + raise ValueError( + "notification_type must match event.event_type " + f"(got {notification_type_value!r}, event has {event_type!r})" + ) + if subscription_event_types is not None: + allowed_event_types = {_enum_value(item) for item in subscription_event_types} + else: + allowed_event_types = None + if allowed_event_types is not None and notification_type_value not in allowed_event_types: + raise ValueError( + "notification_type is not present in the subscription's event_types; " + "sellers must not silently widen account notification filters" + ) + + expected_entity_type = _entity_type_for_wholesale_notification(notification_type_value) + if entity_type != expected_entity_type: + raise ValueError( + "event.entity_type does not match notification_type " + f"(got {entity_type!r}, expected {expected_entity_type!r})" + ) + + cache_scope_value = _enum_value(cache_scope) + applies_to = getattr(event_model.payload, "applies_to", None) + applies_to_scope = _enum_value(getattr(applies_to, "scope", None)) + if applies_to_scope != cache_scope_value: + raise ValueError( + "cache_scope must match event.payload.applies_to.scope " + f"(got {cache_scope_value!r}, event has {applies_to_scope!r})" + ) + + key = idempotency_key or generate_webhook_idempotency_key() + timestamp = fired_at or datetime.now(timezone.utc) + webhook = WholesaleFeedWebhook.model_validate( + { + "idempotency_key": key, + "notification_id": event_model.event_id, + "notification_type": notification_type_value, + "fired_at": timestamp, + "subscriber_id": subscriber_id, + "account_id": account_id, + "wholesale_feed_version": wholesale_feed_version, + "previous_wholesale_feed_version": previous_wholesale_feed_version, + "cache_scope": cache_scope_value, + "event": event_model, + } + ) + return await self.send_raw( + url=url, + idempotency_key=key, + payload=webhook.model_dump(mode="json", exclude_none=True), + extra_headers=extra_headers, + ) + + async def send_wholesale_feed_to_subscription( + self, + *, + subscription: NotificationConfig | Mapping[str, Any], + account_id: str, + notification_type: str, + wholesale_feed_version: str, + cache_scope: str, + event: WholesaleFeedEvent | Mapping[str, Any], + previous_wholesale_feed_version: str | None = None, + fired_at: datetime | None = None, + idempotency_key: str | None = None, + extra_headers: Mapping[str, str] | None = None, + ) -> WebhookDeliveryResult: + """POST a wholesale feed notification to a ``NotificationConfig``. + + This convenience wrapper keeps ``url``, ``subscriber_id``, and + ``event_types`` coupled to the same persisted subscription entry. + """ + + config = ( + subscription + if isinstance(subscription, NotificationConfig) + else NotificationConfig.model_validate(subscription) + ) + return await self.send_wholesale_feed( + url=str(config.url), + subscriber_id=config.subscriber_id, + account_id=account_id, + notification_type=notification_type, + wholesale_feed_version=wholesale_feed_version, + cache_scope=cache_scope, + event=event, + previous_wholesale_feed_version=previous_wholesale_feed_version, + fired_at=fired_at, + idempotency_key=idempotency_key, + subscription_event_types=config.event_types, + extra_headers=extra_headers, + ) + async def send_raw( self, *, diff --git a/src/adcp/webhooks.py b/src/adcp/webhooks.py index 69457c876..36a52d788 100644 --- a/src/adcp/webhooks.py +++ b/src/adcp/webhooks.py @@ -28,8 +28,9 @@ import uuid import warnings from collections.abc import Mapping +from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any, cast +from typing import Any, NoReturn, cast from urllib.parse import urlsplit import httpx @@ -44,6 +45,7 @@ from adcp.server.idempotency.backends import MemoryBackend as MemoryBackend from adcp.server.idempotency.webhook_dedup import WebhookDedupStore as WebhookDedupStore +from adcp.signing.jwks import SSRFValidationError, resolve_and_validate_host from adcp.signing.webhook_hmac import ( LegacyWebhookHmacError, LegacyWebhookHmacOptions, @@ -67,6 +69,7 @@ WebhookReceiver, WebhookReceiverConfig, ) +from adcp.webhook_transport_hooks import TransportHook, apply_hooks # `task_type` → `protocol` mapping. Mirrors the JS reference # implementation's `TOOL_PROTOCOL_MAP` in @@ -788,6 +791,281 @@ def _isoformat_to_proto_timestamp( _MAX_EXTRA_HEADERS = 64 +@dataclass(frozen=True) +class WebhookDestinationPolicy: + """Registration-time policy for durable buyer webhook URLs. + + Use :meth:`production` before persisting buyer-provided + ``push_notification_config.url`` or + ``accounts[].notification_configs[].url``. Use + :meth:`local_development` only for tests and local fixtures that need + HTTP localhost or private-network endpoints. + """ + + require_https: bool = True + allow_private_destinations: bool = False + allowed_destination_ports: frozenset[int] | None = None + transport_hooks: tuple[TransportHook, ...] = () + name: str = "production" + + @classmethod + def production( + cls, + *, + allowed_destination_ports: frozenset[int] | None = None, + transport_hooks: tuple[TransportHook, ...] = (), + ) -> WebhookDestinationPolicy: + """Production webhook policy: HTTPS and public routable IPs only.""" + + return cls( + require_https=True, + allow_private_destinations=False, + allowed_destination_ports=allowed_destination_ports, + transport_hooks=transport_hooks, + name="production", + ) + + @classmethod + def local_development( + cls, + *, + allowed_destination_ports: frozenset[int] | None = None, + transport_hooks: tuple[TransportHook, ...] = (), + ) -> WebhookDestinationPolicy: + """Explicit dev/test policy: allows HTTP and private destinations. + + Cloud metadata endpoints remain blocked by the shared SSRF + validator even when private destinations are allowed. + """ + + return cls( + require_https=False, + allow_private_destinations=True, + allowed_destination_ports=allowed_destination_ports, + transport_hooks=transport_hooks, + name="local_development", + ) + + +@dataclass(frozen=True) +class WebhookDestinationValidation: + """Resolved result of a registration-time webhook URL validation.""" + + original_url: str + effective_url: str + hostname: str + resolved_ip: str + port: int + policy: WebhookDestinationPolicy + + +class WebhookDestinationValidationError(ValueError): + """Typed URL-policy failure suitable for protocol error mapping. + + ``code`` is intentionally the protocol-level bucket sellers commonly + return in ``errors[]``; ``reason`` carries the SDK-specific detail. + ``field`` should be set by callers to values such as + ``push_notification_config.url`` or + ``accounts[0].notification_configs[0].url``. + """ + + code = "INVALID_REQUEST" + + def __init__( + self, + message: str, + *, + reason: str, + field: str | None = None, + url: str | None = None, + effective_url: str | None = None, + policy: WebhookDestinationPolicy | None = None, + suggestion: str | None = None, + ) -> None: + super().__init__(message) + self.reason = reason + self.field = field + self.url = url + self.effective_url = effective_url + self.policy = policy + self.suggestion = suggestion + + def to_error(self) -> dict[str, str]: + """Return a small ``errors[]``-compatible dict for seller handlers.""" + + error = {"code": self.code, "message": str(self)} + if self.field is not None: + error["field"] = self.field + if self.suggestion is not None: + error["suggestion"] = self.suggestion + return error + + +def _raise_webhook_destination_error( + message: str, + *, + reason: str, + field: str | None, + url: str | None, + effective_url: str | None, + policy: WebhookDestinationPolicy, + suggestion: str | None = None, +) -> NoReturn: + raise WebhookDestinationValidationError( + message, + reason=reason, + field=field, + url=url, + effective_url=effective_url, + policy=policy, + suggestion=suggestion, + ) + + +def _validate_policy_hooks(policy: WebhookDestinationPolicy) -> None: + for hook in policy.transport_hooks: + validate = getattr(hook, "validate_for_sender", None) + if callable(validate): + validate(allow_private_destinations=policy.allow_private_destinations) + + +def validate_webhook_destination_url( + url: str, + *, + policy: WebhookDestinationPolicy | None = None, + field: str | None = None, +) -> WebhookDestinationValidation: + """Validate a buyer webhook URL before storing it. + + The helper is the registration-time counterpart to ``WebhookSender``'s + delivery-time SSRF guard. It applies optional transport hooks, enforces + production HTTPS policy, resolves the destination once through the shared + SSRF classifier, and returns the effective URL plus the validated IP. + Sellers should normally persist ``original_url``. ``effective_url`` is for + the immediate validation/delivery decision after transport hooks such as + Docker localhost rewrites; do not persist a test-only rewrite as the + buyer's registered URL. + + Raises :class:`WebhookDestinationValidationError` with structured fields + sellers can map to ``INVALID_REQUEST`` protocol errors. + """ + + active_policy = policy or WebhookDestinationPolicy.production() + _validate_policy_hooks(active_policy) + + if not isinstance(url, str) or not url: + _raise_webhook_destination_error( + "webhook destination URL must be a non-empty string", + reason="missing_url", + field=field, + url=None if not isinstance(url, str) else url, + effective_url=None, + policy=active_policy, + ) + if any(c in url for c in _HEADER_FORBIDDEN_CHARS): + _raise_webhook_destination_error( + "webhook destination URL contains control characters", + reason="control_characters", + field=field, + url=url, + effective_url=None, + policy=active_policy, + ) + + try: + effective_url = apply_hooks(url, active_policy.transport_hooks) + except ValueError as exc: + _raise_webhook_destination_error( + f"webhook destination URL failed transport hook policy: {exc}", + reason="transport_hook_rejected", + field=field, + url=url, + effective_url=None, + policy=active_policy, + ) + if any(c in effective_url for c in _HEADER_FORBIDDEN_CHARS): + _raise_webhook_destination_error( + "webhook destination URL contains control characters after transport hooks", + reason="control_characters", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + ) + + parsed = urlsplit(effective_url) + if parsed.username is not None or parsed.password is not None: + _raise_webhook_destination_error( + "webhook destination URL must not embed userinfo (user:pass@host)", + reason="userinfo_not_allowed", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + suggestion="Pass credentials in webhook authentication settings instead of the URL.", + ) + if parsed.fragment: + _raise_webhook_destination_error( + "webhook destination URL must not include a fragment", + reason="fragment_not_allowed", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + suggestion=( + "Move routing state into the webhook path or query string; " + "URL fragments are never sent in HTTP requests." + ), + ) + if parsed.scheme not in ("http", "https"): + _raise_webhook_destination_error( + f"webhook destination URL must use http:// or https:// (got {parsed.scheme!r})", + reason="invalid_scheme", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + ) + if active_policy.require_https and parsed.scheme != "https": + _raise_webhook_destination_error( + f"webhook destination URL must use https:// under {active_policy.name} policy", + reason="https_required", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + suggestion=( + "Use an HTTPS webhook URL, or pass " + "WebhookDestinationPolicy.local_development() for local tests." + ), + ) + + try: + hostname, resolved_ip, port = resolve_and_validate_host( + effective_url, + allow_private=active_policy.allow_private_destinations, + allowed_ports=active_policy.allowed_destination_ports, + ) + except SSRFValidationError as exc: + _raise_webhook_destination_error( + f"webhook destination URL failed SSRF validation: {exc}", + reason="ssrf_rejected", + field=field, + url=url, + effective_url=effective_url, + policy=active_policy, + ) + + return WebhookDestinationValidation( + original_url=url, + effective_url=effective_url, + hostname=hostname, + resolved_ip=resolved_ip, + port=port, + policy=active_policy, + ) + + def _warn_auth_deprecation_once() -> None: global _AUTH_DEPRECATION_WARNED if _AUTH_DEPRECATION_WARNED: @@ -1284,7 +1562,6 @@ def _validate_header_value(name: str, value: Any) -> None: ) from adcp.webhook_transport_hooks import ( # noqa: E402 DockerLocalhostRewrite, - TransportHook, ) __all__ = [ @@ -1301,6 +1578,10 @@ def _validate_header_value(name: str, value: Any) -> None: "deliver", "WebhookDeliveryResult", "WebhookSender", + "WebhookDestinationPolicy", + "WebhookDestinationValidation", + "WebhookDestinationValidationError", + "validate_webhook_destination_url", # Sender — transport hooks (URL rewrite before SSRF) "DockerLocalhostRewrite", "TransportHook", diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index b9fcf9c4b..3fb0dc117 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -225,6 +225,7 @@ "MediaChannel", "Member", "MemoryBackend", + "NotificationConfig", "OfferingAssetConstraint", "OfferingAssetGroup", "OptimizationGoal", @@ -381,6 +382,8 @@ "WebhookReceiver", "WebhookReceiverConfig", "WebhookVerifyOptions", + "WholesaleFeedEvent", + "WholesaleFeedWebhook", "aliases", "create_a2a_webhook_payload", "create_mcp_webhook_payload", @@ -754,6 +757,7 @@ "Method", "MetricType", "ModuleType", + "NotificationConfig", "NotificationType", "OfferPrice", "Offering", @@ -1004,6 +1008,8 @@ "WebhookFormatGroupAsset", "WebhookMetadata", "WebhookResponseType", + "WholesaleFeedEvent", + "WholesaleFeedWebhook", "aliases", "generated", "to_account_response" diff --git a/tests/test_public_api.py b/tests/test_public_api.py index 503e26e35..d3591eecc 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -29,6 +29,16 @@ def test_core_domain_types_are_exported(): assert hasattr(adcp, type_name), f"{type_name} not exported from adcp package" +def test_wholesale_feed_notification_types_are_stably_exported(): + """AdCP 3.1 catalog webhook types stay on stable import paths.""" + import adcp + from adcp import types + + for type_name in ["NotificationConfig", "WholesaleFeedWebhook", "WholesaleFeedEvent"]: + assert hasattr(adcp, type_name), f"{type_name} not exported from adcp package" + assert hasattr(types, type_name), f"{type_name} not exported from adcp.types" + + def test_request_response_types_are_exported(): """Request/response types are accessible from main package.""" import adcp diff --git a/tests/test_webhook_destination_policy.py b/tests/test_webhook_destination_policy.py new file mode 100644 index 000000000..72fa54dc6 --- /dev/null +++ b/tests/test_webhook_destination_policy.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import socket + +import pytest + +from adcp.webhooks import ( + WebhookDestinationPolicy, + WebhookDestinationValidationError, + validate_webhook_destination_url, +) + + +def test_production_accepts_https_public_destination(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_getaddrinfo(host: str, port: object) -> list[tuple[object, ...]]: + assert host == "example.com" + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("93.184.216.34", 443))] + + monkeypatch.setattr("adcp.signing.jwks.socket.getaddrinfo", fake_getaddrinfo) + + result = validate_webhook_destination_url( + "https://example.com/webhook", + field="push_notification_config.url", + ) + + assert result.effective_url == "https://example.com/webhook" + assert result.hostname == "example.com" + assert result.resolved_ip == "93.184.216.34" + assert result.port == 443 + + +def test_production_rejects_http_before_dns() -> None: + with pytest.raises(WebhookDestinationValidationError) as exc: + validate_webhook_destination_url("http://example.com/webhook") + + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.reason == "https_required" + + +@pytest.mark.parametrize( + "url", + [ + "https://127.0.0.1/webhook", + "https://10.0.0.1/webhook", + "https://169.254.169.254/webhook", + ], +) +def test_production_rejects_private_loopback_and_metadata(url: str) -> None: + with pytest.raises(WebhookDestinationValidationError) as exc: + validate_webhook_destination_url(url, field="accounts[0].notification_configs[0].url") + + assert exc.value.reason == "ssrf_rejected" + assert exc.value.field == "accounts[0].notification_configs[0].url" + assert exc.value.to_error()["code"] == "INVALID_REQUEST" + + +def test_local_development_accepts_http_localhost(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_getaddrinfo(host: str, port: object) -> list[tuple[object, ...]]: + assert host == "localhost" + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.1", 80))] + + monkeypatch.setattr("adcp.signing.jwks.socket.getaddrinfo", fake_getaddrinfo) + + result = validate_webhook_destination_url( + "http://localhost/webhook", + policy=WebhookDestinationPolicy.local_development(), + ) + + assert result.effective_url == "http://localhost/webhook" + assert result.resolved_ip == "127.0.0.1" + assert result.policy.allow_private_destinations is True + + +def test_local_development_still_rejects_cloud_metadata() -> None: + with pytest.raises(WebhookDestinationValidationError) as exc: + validate_webhook_destination_url( + "http://169.254.169.254/webhook", + policy=WebhookDestinationPolicy.local_development(), + ) + + assert exc.value.reason == "ssrf_rejected" + + +def test_rejects_url_fragments_before_dns(monkeypatch: pytest.MonkeyPatch) -> None: + def fail_getaddrinfo(host: str, port: object) -> list[tuple[object, ...]]: + raise AssertionError("fragment rejection should happen before DNS") + + monkeypatch.setattr("adcp.signing.jwks.socket.getaddrinfo", fail_getaddrinfo) + + with pytest.raises(WebhookDestinationValidationError) as exc: + validate_webhook_destination_url("https://example.com/webhook#buyer-primary") + + assert exc.value.reason == "fragment_not_allowed" diff --git a/tests/test_wholesale_feed_webhook_sender.py b/tests/test_wholesale_feed_webhook_sender.py new file mode 100644 index 000000000..7180475ac --- /dev/null +++ b/tests/test_wholesale_feed_webhook_sender.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from uuid import UUID + +import httpx +import pytest + +from adcp import NotificationConfig +from adcp.webhooks import WebhookSender + + +def _removed_event(event_type: str, *, scope: str = "public") -> dict[str, object]: + event_id = "018f13f8-7b40-7000-8000-000000000123" + if event_type == "product.removed": + return { + "event_id": event_id, + "event_type": "product.removed", + "entity_type": "product", + "entity_id": "prod_1", + "created_at": "2026-05-23T12:00:00Z", + "payload": { + "product_id": "prod_1", + "applies_to": {"scope": scope}, + }, + } + if event_type == "signal.removed": + return { + "event_id": event_id, + "event_type": "signal.removed", + "entity_type": "signal", + "entity_id": "seg_1", + "created_at": "2026-05-23T12:00:00Z", + "payload": { + "signal_agent_segment_id": "seg_1", + "applies_to": {"scope": scope}, + }, + } + raise AssertionError(event_type) + + +async def _capturing_sender() -> tuple[WebhookSender, list[httpx.Request], httpx.AsyncClient]: + captured: list[httpx.Request] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(202, json={"ok": True}) + + client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + return WebhookSender.from_bearer_token("test-token", client=client), captured, client + + +@pytest.mark.asyncio +async def test_send_wholesale_feed_product_event_envelope() -> None: + sender, captured, client = await _capturing_sender() + fired_at = datetime(2026, 5, 23, 12, 30, tzinfo=timezone.utc) + + async with client: + result = await sender.send_wholesale_feed( + url="https://buyer.example/webhooks/catalog", + subscriber_id="buyer-primary", + account_id="acct_1", + notification_type="product.removed", + wholesale_feed_version="wf_2", + previous_wholesale_feed_version="wf_1", + cache_scope="public", + event=_removed_event("product.removed"), + fired_at=fired_at, + idempotency_key="whk_product_removed_001", + subscription_event_types=["product.removed", "signal.removed"], + ) + + assert result.ok + body = json.loads(captured[0].content) + assert body["idempotency_key"] == "whk_product_removed_001" + assert body["notification_id"] == "018f13f8-7b40-7000-8000-000000000123" + assert body["notification_type"] == "product.removed" + assert body["subscriber_id"] == "buyer-primary" + assert body["account_id"] == "acct_1" + assert body["wholesale_feed_version"] == "wf_2" + assert body["previous_wholesale_feed_version"] == "wf_1" + assert body["cache_scope"] == "public" + assert body["event"]["event_type"] == "product.removed" + assert captured[0].headers["authorization"] == "Bearer test-token" + + +@pytest.mark.asyncio +async def test_send_wholesale_feed_signal_event_envelope() -> None: + sender, captured, client = await _capturing_sender() + subscription = NotificationConfig.model_validate( + { + "subscriber_id": "audit-bus", + "url": "https://buyer.example/webhooks/catalog", + "event_types": ["signal.removed"], + } + ) + + async with client: + await sender.send_wholesale_feed( + url=str(subscription.url), + subscriber_id=subscription.subscriber_id, + account_id="acct_1", + notification_type="signal.removed", + wholesale_feed_version="swf_9", + cache_scope="account", + event=_removed_event("signal.removed", scope="account"), + idempotency_key="whk_signal_removed_001", + subscription_event_types=subscription.event_types, + ) + + body = json.loads(captured[0].content) + assert body["notification_type"] == "signal.removed" + assert body["subscriber_id"] == "audit-bus" + assert body["account_id"] == "acct_1" + assert body["wholesale_feed_version"] == "swf_9" + assert body["cache_scope"] == "account" + assert body["event"]["entity_type"] == "signal" + assert UUID(body["notification_id"]) == UUID(body["event"]["event_id"]) + + +@pytest.mark.asyncio +async def test_send_wholesale_feed_to_subscription_uses_config_fields() -> None: + sender, captured, client = await _capturing_sender() + subscription = NotificationConfig.model_validate( + { + "subscriber_id": "buyer-primary", + "url": "https://buyer.example/webhooks/catalog", + "event_types": ["product.removed"], + } + ) + + async with client: + await sender.send_wholesale_feed_to_subscription( + subscription=subscription, + account_id="acct_1", + notification_type="product.removed", + wholesale_feed_version="wf_2", + cache_scope="public", + event=_removed_event("product.removed"), + idempotency_key="whk_subscription_product_removed_001", + ) + + body = json.loads(captured[0].content) + assert str(captured[0].url) == "https://buyer.example/webhooks/catalog" + assert body["subscriber_id"] == "buyer-primary" + assert body["notification_type"] == "product.removed" + + +@pytest.mark.asyncio +async def test_send_wholesale_feed_rejects_event_type_mismatch() -> None: + sender, _captured, client = await _capturing_sender() + + async with client: + with pytest.raises(ValueError, match="notification_type must match"): + await sender.send_wholesale_feed( + url="https://buyer.example/webhooks/catalog", + subscriber_id="buyer-primary", + account_id="acct_1", + notification_type="signal.removed", + wholesale_feed_version="wf_2", + cache_scope="public", + event=_removed_event("product.removed"), + ) + + +@pytest.mark.asyncio +async def test_send_wholesale_feed_rejects_unsubscribed_event_type() -> None: + sender, _captured, client = await _capturing_sender() + + async with client: + with pytest.raises(ValueError, match="subscription's event_types"): + await sender.send_wholesale_feed( + url="https://buyer.example/webhooks/catalog", + subscriber_id="buyer-primary", + account_id="acct_1", + notification_type="product.removed", + wholesale_feed_version="wf_2", + cache_scope="public", + event=_removed_event("product.removed"), + subscription_event_types=["signal.removed"], + )