Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,45 @@ async with sender:
9421 signing, and the httpx POST in one call. `send_raw(...)` is an escape
hatch for custom payload shapes; dedicated methods exist for every webhook
kind (`send_revocation_notification`, `send_artifact_webhook`,
`send_collection_list_changed`, `send_property_list_changed`).
`send_collection_list_changed`, `send_property_list_changed`,
`send_wholesale_feed`).

Validate buyer-provided webhook URLs before storing durable subscriptions:

```python
from adcp.webhooks import WebhookDestinationPolicy, validate_webhook_destination_url

validate_webhook_destination_url(
request.push_notification_config.url,
field="push_notification_config.url",
policy=WebhookDestinationPolicy.production(),
)
```

Use `WebhookDestinationPolicy.local_development()` only for local tests that
need `http://localhost` or private-network destinations. Production validation
requires HTTPS and rejects loopback, private, link-local, reserved, and cloud
metadata destinations using the same SSRF classifier as `WebhookSender`. The
validation result includes both `original_url` and `effective_url`; sellers
should normally persist the buyer's original URL and reapply the same
policy/hooks at send time, rather than storing a Docker or test rewrite.

Wholesale feed notifications use stable types from `adcp` / `adcp.types`:

```python
from adcp import NotificationConfig, WholesaleFeedEvent, WholesaleFeedWebhook
from adcp.webhooks import WebhookSender

if "product.updated" in subscription.event_types:
await sender.send_wholesale_feed_to_subscription(
subscription=subscription,
account_id=account_id,
notification_type="product.updated",
wholesale_feed_version=feed_version,
cache_scope="public",
event=event,
)
```

The webhook-signing JWK MUST be published in your `adagents.json` with
`adcp_use: "webhook-signing"` — distinct from your `request-signing` key so
Expand Down
49 changes: 49 additions & 0 deletions docs/handler-authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,55 @@ Pick one per `WebhookSender` instance. All three share the same
| `WebhookSender.from_bearer_token(token)` | `Authorization: Bearer` | Simplest; no key management; requires TLS |
| `WebhookSender.from_standard_webhooks_secret(secret, key_id=...)` | Standard Webhooks v1 | Svix / Resend / standardwebhooks.com receivers |

### Registration-time URL validation

Validate durable buyer endpoints before persisting `push_notification_config.url`
or `accounts[].notification_configs[].url` from `sync_accounts`:

```python
from adcp.webhooks import (
WebhookDestinationPolicy,
WebhookDestinationValidationError,
validate_webhook_destination_url,
)

try:
validate_webhook_destination_url(
config.url,
field="accounts[0].notification_configs[0].url",
policy=WebhookDestinationPolicy.production(),
)
except WebhookDestinationValidationError as exc:
return {"errors": [exc.to_error()]}
```

Production policy requires HTTPS and rejects private, loopback, link-local,
reserved, and cloud metadata destinations. Use
`WebhookDestinationPolicy.local_development()` only for local fixtures that
need `http://localhost` or private-network endpoints. The helper returns both
`original_url` and `effective_url`; persist the buyer's original URL in durable
subscription state, and reapply the same policy/hooks when sending. Do not
persist a Docker or test rewrite as the buyer's registered endpoint.

### Wholesale feed notifications

`NotificationConfig`, `WholesaleFeedEvent`, and `WholesaleFeedWebhook` are
stable exports from both `adcp` and `adcp.types`. When firing account-scoped
catalog notifications, preserve the subscriber filter and send through
`WebhookSender`:

```python
if event_type in subscription.event_types:
await sender.send_wholesale_feed_to_subscription(
subscription=subscription,
account_id=account_id,
notification_type=event_type,
wholesale_feed_version=feed_version,
cache_scope="public",
event=event,
)
```

### Sender vs. supervisor

`WebhookSender` is the transport layer — it constructs and signs one HTTP POST.
Expand Down
6 changes: 6 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
MediaBuyPackage,
MediaBuyStatus,
MediaChannel,
NotificationConfig,
OfferingAssetConstraint,
OfferingAssetGroup,
# Optimization
Expand Down Expand Up @@ -286,6 +287,8 @@
VerifyBrandClaimsRequestBulk,
VerifyBrandClaimsResponseBulk,
WcagLevel,
WholesaleFeedEvent,
WholesaleFeedWebhook,
aliases,
)

Expand Down Expand Up @@ -867,6 +870,9 @@ def get_adcp_version() -> str:
"SignalPricingOption",
# Configuration types
"PushNotificationConfig",
"NotificationConfig",
"WholesaleFeedEvent",
"WholesaleFeedWebhook",
# Adagents validation
"AdAgentsValidationResult",
"AdagentsCacheEntry",
Expand Down
6 changes: 6 additions & 0 deletions src/adcp/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
MediaBuyStatus,
MediaChannel,
Metadata,
NotificationConfig,
NotificationType,
Offering,
OfferingAssetConstraint,
Expand Down Expand Up @@ -395,6 +396,8 @@
ViewThreshold,
WcagLevel,
WebhookResponseType,
WholesaleFeedEvent,
WholesaleFeedWebhook,
)
from adcp.types._generated import (
AudioAsset as AudioContent,
Expand Down Expand Up @@ -1128,6 +1131,7 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"AuthorizedAgents",
"AvailableMetric",
"PushNotificationConfig",
"NotificationConfig",
"ReportingCapabilities",
"ReportingFrequency",
"ReportingPeriod",
Expand Down Expand Up @@ -1162,6 +1166,8 @@ def __init__(self, *args: object, **kwargs: object) -> None:
"WebhookMetadata",
# Webhook types
"McpWebhookPayload",
"WholesaleFeedEvent",
"WholesaleFeedWebhook",
# Semantic aliases for discriminated unions
"ActivateSignalErrorResponse",
"ActivateSignalSuccessResponse",
Expand Down
162 changes: 159 additions & 3 deletions src/adcp/webhook_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@

import json
import warnings
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

Expand All @@ -50,7 +50,14 @@
build_async_ip_pinned_transport,
)
from adcp.signing.standard_webhooks import decode_secret as _decode_sw_secret
from adcp.types import AdcpProtocol, GeneratedTaskStatus, TaskType
from adcp.types import (
AdcpProtocol,
GeneratedTaskStatus,
NotificationConfig,
TaskType,
WholesaleFeedEvent,
WholesaleFeedWebhook,
)
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
from adcp.webhook_auth import (
AdcpLegacyHmacStrategy,
Expand Down Expand Up @@ -121,6 +128,24 @@ def _validate_hooks(hooks: tuple[TransportHook, ...], allow_private_destinations
validate(allow_private_destinations=allow_private_destinations)


def _entity_type_for_wholesale_notification(notification_type: str) -> str:
if notification_type.startswith("product."):
return "product"
if notification_type.startswith("signal."):
return "signal"
if notification_type == "wholesale_feed.bulk_change":
return "feed"
raise ValueError(
f"unsupported wholesale feed notification_type {notification_type!r}; "
"expected product.*, signal.*, or wholesale_feed.bulk_change"
)


def _enum_value(value: Any) -> str:
raw = getattr(value, "value", value)
return str(raw)


@dataclass(frozen=True)
class WebhookDeliveryResult:
"""Outcome of one ``send_*`` call.
Expand Down Expand Up @@ -680,6 +705,137 @@ async def send_property_list_changed(
url=url, idempotency_key=key, payload=payload, extra_headers=extra_headers
)

async def send_wholesale_feed(
self,
*,
url: str,
subscriber_id: str,
account_id: str,
notification_type: str,
wholesale_feed_version: str,
cache_scope: str,
event: WholesaleFeedEvent | Mapping[str, Any],
previous_wholesale_feed_version: str | None = None,
fired_at: datetime | None = None,
idempotency_key: str | None = None,
subscription_event_types: Sequence[Any] | None = None,
extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
"""POST a signed account-scoped wholesale feed notification.

``subscription_event_types`` is optional but recommended when the
caller is sending to an ``accounts[].notification_configs[]`` entry:
pass that entry's ``event_types`` to fail closed if the subscription
did not request this notification type.
"""

if not isinstance(subscriber_id, str) or not subscriber_id:
raise ValueError("subscriber_id must be a non-empty string")
if not isinstance(account_id, str) or not account_id:
raise ValueError("account_id must be a non-empty string")
if not isinstance(wholesale_feed_version, str) or not wholesale_feed_version:
raise ValueError("wholesale_feed_version must be a non-empty string")

event_model = event
if not isinstance(event_model, WholesaleFeedEvent):
event_model = WholesaleFeedEvent.model_validate(event_model)
notification_type_value = _enum_value(notification_type)
event_type = _enum_value(event_model.event_type)
entity_type = _enum_value(event_model.entity_type)
if notification_type_value != event_type:
raise ValueError(
"notification_type must match event.event_type "
f"(got {notification_type_value!r}, event has {event_type!r})"
)
if subscription_event_types is not None:
allowed_event_types = {_enum_value(item) for item in subscription_event_types}
else:
allowed_event_types = None
if allowed_event_types is not None and notification_type_value not in allowed_event_types:
raise ValueError(
"notification_type is not present in the subscription's event_types; "
"sellers must not silently widen account notification filters"
)

expected_entity_type = _entity_type_for_wholesale_notification(notification_type_value)
if entity_type != expected_entity_type:
raise ValueError(
"event.entity_type does not match notification_type "
f"(got {entity_type!r}, expected {expected_entity_type!r})"
)

cache_scope_value = _enum_value(cache_scope)
applies_to = getattr(event_model.payload, "applies_to", None)
applies_to_scope = _enum_value(getattr(applies_to, "scope", None))
if applies_to_scope != cache_scope_value:
raise ValueError(
"cache_scope must match event.payload.applies_to.scope "
f"(got {cache_scope_value!r}, event has {applies_to_scope!r})"
)

key = idempotency_key or generate_webhook_idempotency_key()
timestamp = fired_at or datetime.now(timezone.utc)
webhook = WholesaleFeedWebhook.model_validate(
{
"idempotency_key": key,
"notification_id": event_model.event_id,
"notification_type": notification_type_value,
"fired_at": timestamp,
"subscriber_id": subscriber_id,
"account_id": account_id,
"wholesale_feed_version": wholesale_feed_version,
"previous_wholesale_feed_version": previous_wholesale_feed_version,
"cache_scope": cache_scope_value,
"event": event_model,
}
)
return await self.send_raw(
url=url,
idempotency_key=key,
payload=webhook.model_dump(mode="json", exclude_none=True),
extra_headers=extra_headers,
)

async def send_wholesale_feed_to_subscription(
self,
*,
subscription: NotificationConfig | Mapping[str, Any],
account_id: str,
notification_type: str,
wholesale_feed_version: str,
cache_scope: str,
event: WholesaleFeedEvent | Mapping[str, Any],
previous_wholesale_feed_version: str | None = None,
fired_at: datetime | None = None,
idempotency_key: str | None = None,
extra_headers: Mapping[str, str] | None = None,
) -> WebhookDeliveryResult:
"""POST a wholesale feed notification to a ``NotificationConfig``.

This convenience wrapper keeps ``url``, ``subscriber_id``, and
``event_types`` coupled to the same persisted subscription entry.
"""

config = (
subscription
if isinstance(subscription, NotificationConfig)
else NotificationConfig.model_validate(subscription)
)
return await self.send_wholesale_feed(
url=str(config.url),
subscriber_id=config.subscriber_id,
account_id=account_id,
notification_type=notification_type,
wholesale_feed_version=wholesale_feed_version,
cache_scope=cache_scope,
event=event,
previous_wholesale_feed_version=previous_wholesale_feed_version,
fired_at=fired_at,
idempotency_key=idempotency_key,
subscription_event_types=config.event_types,
extra_headers=extra_headers,
)

async def send_raw(
self,
*,
Expand Down
Loading
Loading