From a603199b2ef26af2b88baf51b6840cd9c3e6aa1f Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 6 Jun 2026 21:21:33 -0400 Subject: [PATCH 1/2] feat(decisioning): async (handoff) discovery for get_products / get_signals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire ctx.handoff_to_task(fn) for get_products and get_signals so brief / refine discovery MAY background to a long-running task, mirroring JS adcp-client#2170 (rc8 async discovery parity). Closes #924. - types.py: add DiscoveryResult[T] alias (same arms as SalesResult) — the typing gate for discovery handoff. - specialisms/sales.py + signals.py: return types MaybeAsync -> DiscoveryResult; fix the now-false "sync only / no async envelope" docstrings. Document that wholesale MUST stay synchronous (incomplete[]) and that get_signals has NO input_required arm (submitted/working only). - types/aliases.py + types/__init__.py: expose the orphaned async arm classes as semantic aliases (GetProducts{Submitted,Working,InputRequired}Response, GetSignals{Submitted,Working}Response) plus GetProductsResponseUnion / GetSignalsResponseUnion. GetProductsResponse / GetSignalsResponse stay the constructable success class (the rc.9 schema is flat, not a union — patch via aliases, no codegen, no generated_poc edits). - discovery_guards.py: four rejection guards (all INVALID_REQUEST/correctable): (a) wholesale + push_notification_config -> pre-dispatch reject, no platform call; (b) wholesale + adopter handoff -> post-dispatch reject; (c) async + unresolved account -> field='account'; (d) hand-rolled submitted dict -> guiding error pointing at ctx.handoff_to_task. - handler.py: wire the guards into the get_products / get_signals shims and thread the persist-draft terminal side-effect as an on_complete hook so it fires on COMPLETION in both the sync and handoff paths (mirrors create_media_buy). rc.9 already ships the bundled submitted/working/input-required schemas and the get_products/get_signals TaskType enum values, so wire validation and tasks/get round-trip without codegen changes. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/decisioning/__init__.py | 2 + src/adcp/decisioning/discovery_guards.py | 248 ++++++++ src/adcp/decisioning/handler.py | 96 ++- src/adcp/decisioning/specialisms/sales.py | 23 +- src/adcp/decisioning/specialisms/signals.py | 31 +- src/adcp/decisioning/types.py | 17 + src/adcp/types/__init__.py | 18 + src/adcp/types/aliases.py | 95 +++ tests/fixtures/public_api_snapshot.json | 9 + tests/test_decisioning_async_discovery.py | 671 ++++++++++++++++++++ 10 files changed, 1195 insertions(+), 15 deletions(-) create mode 100644 src/adcp/decisioning/discovery_guards.py create mode 100644 tests/test_decisioning_async_discovery.py diff --git a/src/adcp/decisioning/__init__.py b/src/adcp/decisioning/__init__.py index cacb424e9..1e4d422b7 100644 --- a/src/adcp/decisioning/__init__.py +++ b/src/adcp/decisioning/__init__.py @@ -230,6 +230,7 @@ def create_media_buy( from adcp.decisioning.types import ( Account, AdcpError, + DiscoveryResult, MaybeAsync, SalesResult, SyncAccountsResultRow, @@ -352,6 +353,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "CreativeBuilderPlatform", "DecisioningCapabilities", "DecisioningPlatform", + "DiscoveryResult", "DynamicBearer", "ExplicitAccounts", "Format", diff --git a/src/adcp/decisioning/discovery_guards.py b/src/adcp/decisioning/discovery_guards.py new file mode 100644 index 000000000..8240cd6eb --- /dev/null +++ b/src/adcp/decisioning/discovery_guards.py @@ -0,0 +1,248 @@ +"""Rejection guards for async (handoff) discovery on get_products / get_signals. + +The rc.9 spec admits async discovery for ``buying_mode='brief'|'refine'`` +(get_products) and ``discovery_mode='brief'`` (get_signals): the seller MAY +return a ``submitted`` envelope and the buyer polls ``tasks/get`` for the +terminal catalog. The spec is equally explicit about what async discovery +MUST NOT do, and these guards enforce those MUST-NOTs at the framework seam +so adopters can't ship a non-conformant async discovery surface by accident. + +Four guards, all projecting +``AdcpError('INVALID_REQUEST', recovery='correctable')``: + +(a) **Wholesale + push pre-dispatch reject.** ``buying_mode='wholesale'`` / + ``discovery_mode='wholesale'`` is a raw rate-card read with no async + lifecycle. The spec: *"agents MUST NOT route a 'wholesale' request + through the async/Submitted arm or emit async delivery solely because + push_notification_config is present."* A wholesale request carrying + ``push_notification_config`` is malformed — reject BEFORE invoking the + platform method (``field='push_notification_config'``). Lives next to + ``assert_buying_mode_consistent`` in :mod:`adcp.decisioning.refine` + (products) and is called from the get_signals shim (signals). + +(b) **Wholesale + adopter handoff post-dispatch reject.** Belt-and-braces + for (a): even with no push config, an adopter whose wholesale code path + returns ``ctx.handoff_to_task(fn)`` violates the wholesale-is-sync + contract. The handoff has already been projected to a ``submitted`` + dict by dispatch; reject it (``field='buying_mode'`` / ``'discovery_mode'``) + so the buyer never sees a wholesale task_id. + +(c) **Async + unresolved account reject.** An async discovery task is + addressable only by ``task_id`` scoped to a resolved account — the + registry issues against ``ctx.account.id`` and ``tasks/get`` is + account-scoped. If the request entered the async path (the adopter + handed off, OR the buyer supplied ``push_notification_config``) but the + account resolved to the sentinel/empty id, the task would be + unreachable. Reject (``field='account'``). Mirrors JS #2170's + accountless-async rejection. + +(d) **Hand-rolled submitted reject.** An adopter who returns a literal + ``{'status': 'submitted', 'task_id': ...}`` dict from the sync arm — + instead of ``ctx.handoff_to_task(fn)`` — bypasses the framework's task + registry: no row is issued, ``tasks/get`` 404s, no completion webhook + fires. The dispatch handoff projection emits the EXACT 2-key dict + ``{'task_id', 'status'}``; a hand-rolled submitted carries either extra + keys or a task_id the registry never minted. Raise a guiding error + pointing the adopter at ``ctx.handoff_to_task``. + +Asymmetry: products' mode field is ``buying_mode`` with values +{brief, wholesale, refine}; signals' mode field is ``discovery_mode`` with +values {brief, wholesale}. The guards take the field name as a parameter so +the same logic serves both verbs. +""" + +from __future__ import annotations + +from typing import Any + +from adcp.decisioning.types import AdcpError + + +def _coerce_mode(req: Any, mode_field: str) -> str | None: + """Read ``req.`` as a plain string (enum or str, None passthrough).""" + value = getattr(req, mode_field, None) + if value is None: + return None + return value.value if hasattr(value, "value") else str(value) + + +def _has_push_config(req: Any) -> bool: + """True when the request carries a non-empty ``push_notification_config``.""" + config = getattr(req, "push_notification_config", None) + if config is None and isinstance(req, dict): + config = req.get("push_notification_config") + return config is not None + + +def _is_submitted_projection(result: Any) -> bool: + """True for the framework's handoff projection — the exact 2-key + ``{'task_id', 'status': 'submitted'}`` dict emitted by + :func:`adcp.decisioning.dispatch._project_handoff`.""" + return ( + isinstance(result, dict) + and set(result.keys()) == {"task_id", "status"} + and result.get("status") == "submitted" + ) + + +def assert_discovery_push_consistent(req: Any, *, mode_field: str) -> None: + """Guard (a): pre-dispatch reject of wholesale + push_notification_config. + + ``mode_field`` is ``'buying_mode'`` (get_products) or ``'discovery_mode'`` + (get_signals). Wholesale is a synchronous rate-card read; a wholesale + request carrying ``push_notification_config`` asks for an async + delivery channel the verb does not offer in that mode. Reject before + the platform method runs (``field='push_notification_config'``). + + :raises AdcpError: ``INVALID_REQUEST`` / ``correctable`` when mode is + wholesale AND push_notification_config is present. + """ + if _coerce_mode(req, mode_field) == "wholesale" and _has_push_config(req): + raise AdcpError( + "INVALID_REQUEST", + message=( + f"{mode_field}='wholesale' must not carry " + "push_notification_config. Wholesale discovery is a " + "synchronous rate-card read with no async lifecycle — the " + "spec forbids routing it through the submitted arm or " + "emitting async delivery because a push config is present. " + "Drop push_notification_config, or use the 'brief' mode " + "which supports the async (submitted) arm." + ), + field="push_notification_config", + recovery="correctable", + ) + + +def reject_wholesale_handoff(result: Any, *, mode: str | None, mode_field: str) -> None: + """Guard (b): post-dispatch reject of an adopter handoff on a wholesale call. + + Called after ``_invoke_platform_method`` with the (already projected) + result. When the resolved mode is wholesale and the result is the + framework's submitted projection, the adopter's wholesale code path + handed off — a contract violation. Reject (``field=``). + + :param mode: The request's coerced mode string. + :raises AdcpError: ``INVALID_REQUEST`` / ``correctable``. + """ + if mode == "wholesale" and _is_submitted_projection(result): + raise AdcpError( + "INVALID_REQUEST", + message=( + f"{mode_field}='wholesale' returned a task handoff, but " + "wholesale discovery MUST complete synchronously. Return the " + "catalog directly; signal partial completion via the " + "response's incomplete[] field rather than handing off to a " + "background task. The 'brief' mode is the async-capable path." + ), + field=mode_field, + recovery="correctable", + ) + + +def assert_account_resolved_for_async( + result: Any, + *, + account_id: str | None, + has_push: bool, +) -> None: + """Guard (c): reject an async discovery call against an unresolved account. + + An async discovery task is addressable only by a ``task_id`` scoped to a + resolved account. The request is "async" when EITHER the adopter handed + off (the result is the framework's submitted projection) OR the buyer + supplied ``push_notification_config``. ``account_id`` is "unresolved" + when it is empty/whitespace or the ``''`` sentinel — matching + :func:`adcp.decisioning.dispatch.compose_caller_identity`'s contract + across derived / implicit / explicit account modes. + + Call this both pre-dispatch (with ``result=None``) when the buyer + supplied ``push_notification_config`` — so the rejection fires as a + clean ``INVALID_REQUEST`` before any platform / registry interaction — + and post-dispatch (passing the projected ``result``) so an adopter + handoff against an unresolved account is also caught. The framework's + registry independently rejects an empty ``account_id`` at issue-time; + this guard surfaces the buyer-facing ``field='account'`` diagnostic. + + :raises AdcpError: ``INVALID_REQUEST`` / ``correctable`` with + ``field='account'``. + """ + is_async = has_push or _is_submitted_projection(result) + if not is_async: + return + if _account_resolved(account_id): + return + raise AdcpError( + "INVALID_REQUEST", + message=( + "Async discovery requires a resolved account. This request " + "entered the async path (the seller handed off, or the request " + "carried push_notification_config) but no account resolved — the " + "submitted task_id would be unreachable via tasks/get, and a " + "push callback could not be scoped. Supply an account reference " + "(or authenticate so the account store can derive one), or use a " + "synchronous discovery call." + ), + field="account", + recovery="correctable", + ) + + +def _account_resolved(account_id: str | None) -> bool: + """False when ``account_id`` is empty/whitespace or the ``''`` sentinel.""" + if not account_id or not account_id.strip(): + return False + return account_id != "" + + +def reject_hand_rolled_submitted(result: Any) -> None: + """Guard (d): reject a literal hand-rolled submitted dict from the sync arm. + + The framework's handoff projection emits the EXACT 2-key dict + ``{'task_id', 'status': 'submitted'}`` — that shape is produced ONLY by + :func:`adcp.decisioning.dispatch._project_handoff` and is therefore + already a legitimate registry-backed task. An adopter who instead returns + a dict with ``status='submitted'`` plus other keys (or builds the + submitted envelope by hand) bypassed the registry: ``tasks/get`` will + 404 and no completion webhook fires. Raise a guiding error. + + Pydantic ``GetProductsSubmitted`` / ``GetSignalsSubmitted`` instances + returned directly are caught here too — they carry a ``task_id`` the + framework never minted. + + :raises AdcpError: ``INVALID_REQUEST`` / ``correctable``. + """ + status: Any = None + if isinstance(result, dict): + status = result.get("status") + elif hasattr(result, "status"): + status = getattr(result, "status", None) + status_str = status.value if hasattr(status, "value") else status + + if status_str != "submitted": + return + # The framework's own projection — legitimate, leave it. + if _is_submitted_projection(result): + return + raise AdcpError( + "INVALID_REQUEST", + message=( + "Discovery returned a hand-rolled 'submitted' envelope. Do not " + "construct the submitted arm yourself — the framework never " + "issued a task for this task_id, so tasks/get would 404 and no " + "completion webhook would fire. To run discovery asynchronously, " + "return ctx.handoff_to_task(fn) (or ctx.handoff_to_workflow(fn) " + "for adopter-owned external completion); the framework allocates " + "the task_id, persists the submitted state, and emits the wire " + "envelope for you." + ), + recovery="correctable", + ) + + +__all__ = [ + "assert_account_resolved_for_async", + "assert_discovery_push_consistent", + "reject_hand_rolled_submitted", + "reject_wholesale_handoff", +] diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 16bc0f165..5a9818a11 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -46,6 +46,12 @@ ) from adcp.decisioning.accounts import ResolveContext, _call_with_optional_ctx from adcp.decisioning.context import AuthInfo +from adcp.decisioning.discovery_guards import ( + assert_account_resolved_for_async, + assert_discovery_push_consistent, + reject_hand_rolled_submitted, + reject_wholesale_handoff, +) from adcp.decisioning.dispatch import ( _build_request_context, _invoke_platform_method, @@ -1777,7 +1783,19 @@ async def get_products( # type: ignore[override] # mutual-exclusion rules (refine+brief, wholesale+brief, refine # without refine[]). assert_buying_mode_consistent(params) + # Guard (a): wholesale + push_notification_config is rejected + # before the platform method runs — wholesale discovery is a + # synchronous rate-card read with no async lifecycle. The + # platform method is never invoked on this path. + assert_discovery_push_consistent(params, mode_field="buying_mode") account = await self._resolve_account(params.account, tool_ctx) + # Guard (c) pre-dispatch: a buyer-supplied push_notification_config + # makes the request async up front. If the account is unresolved + # (sentinel/empty id) the eventual task_id would be unreachable — + # reject before invoking the platform method. + _has_push = getattr(params, "push_notification_config", None) is not None + if _has_push: + assert_account_resolved_for_async(None, account_id=account.id, has_push=True) ctx = self._build_ctx(tool_ctx, account) # Refine flow: when buying_mode='refine' the framework dispatches # to refine_get_products() (when present) and projects the result @@ -1854,6 +1872,25 @@ async def get_products( # type: ignore[override] # AdcpErrors propagate unmodified; only the platform call is deadline- # wrapped. deadline = resolve_time_budget(params.time_budget) + + # Terminal side-effect (persist draft proposals) threaded as an + # on_complete hook so it fires on COMPLETION in BOTH the sync and + # handoff paths — never at submit. On the handoff path the bg task + # awaits the adopter's coroutine, then fires this hook with the + # terminal GetProductsResponse before registry.complete; on the + # sync path it fires inline with the adapter's return. This mirrors + # create_media_buy's consumption-finalize hook (handler.py + # create_media_buy). The hook persists the raw adapter result; + # buyer-presentation projections (property-list, pagination, + # fields) shape only the wire response, not the stored draft. + captured_platform = self._platform + captured_ctx = ctx + + async def _persist_draft_hook(get_products_result: Any) -> None: + await maybe_persist_draft_after_get_products( + captured_platform, get_products_result, captured_ctx + ) + coro = _invoke_platform_method( self._platform, "get_products", @@ -1861,6 +1898,7 @@ async def get_products( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + on_complete=_persist_draft_hook, ) try: result = await ( @@ -1891,6 +1929,25 @@ async def get_products( # type: ignore[override] return GetProductsResponse.model_validate( project_incomplete_response(interval=interval, unit=unit) ) + # Post-dispatch discovery guards (run on the raw result before any + # success-shape projection). ``result`` is either a typed + # GetProductsResponse (sync) or the framework's submitted + # projection dict (handoff). All three reject with + # INVALID_REQUEST / correctable. + # (d) hand-rolled submitted: adopter built {'status':'submitted'} + # by hand instead of ctx.handoff_to_task — no registry row. + # (b) wholesale + handoff: wholesale MUST be synchronous. + # (c) async + unresolved account: the task_id would be unreachable. + reject_hand_rolled_submitted(result) + reject_wholesale_handoff(result, mode=mode, mode_field="buying_mode") + assert_account_resolved_for_async(result, account_id=account.id, has_push=_has_push) + # Handoff path: the submitted envelope is returned verbatim, the + # same as create_media_buy. Success-shape projections (property + # list, pagination, fields, draft persist) apply only to the sync + # success arm. The registry-completion webhook for the eventual + # terminal artifact fires from _project_handoff, not here. + if isinstance(result, dict) and result.get("status") == "submitted": + return cast("GetProductsResponse", result) response = cast("GetProductsResponse", result) # Post-adapter: capability-gated property-list filter. response = cast( @@ -1913,11 +1970,10 @@ async def get_products( # type: ignore[override] ) if params.fields: response = _project_product_fields(response, params.fields) - # v1.5: persist draft proposals from brief / wholesale calls so - # subsequent finalize / create_media_buy can hydrate from the - # store. No-op when no proposal_store is wired for this tenant - # or when the response carries no proposals. - await maybe_persist_draft_after_get_products(self._platform, response, ctx) + # Draft proposals are persisted by the _persist_draft_hook + # on_complete seam (threaded above) so the same side-effect fires + # on both the sync and handoff completion paths. The hook ran + # inline on the sync path before this point. return response async def create_media_buy( # type: ignore[override] @@ -2479,10 +2535,32 @@ async def get_signals( # type: ignore[override] params: GetSignalsRequest, context: ToolContext | None = None, ) -> GetSignalsResponse: - """Catalog discovery for signal-marketplace / signal-owned.""" + """Catalog discovery for signal-marketplace / signal-owned. + + Synchronous by default; ``discovery_mode='brief'`` MAY hand off to + a background task (submitted / working arms — no input_required). + ``discovery_mode='wholesale'`` MUST stay synchronous. + """ tool_ctx = context or ToolContext() + # Guard (a): wholesale + push_notification_config is rejected + # before the platform method runs. Wholesale signal discovery is a + # synchronous catalog read with no async lifecycle. + assert_discovery_push_consistent(params, mode_field="discovery_mode") account = await self._resolve_account(getattr(params, "account", None), tool_ctx) + # Guard (c) pre-dispatch: push_notification_config makes the request + # async up front; reject against an unresolved account before + # _build_ctx (whose compose_caller_identity would otherwise raise a + # less-specific terminal error on the sentinel id) and before the + # platform method runs. + _has_push = getattr(params, "push_notification_config", None) is not None + if _has_push: + assert_account_resolved_for_async(None, account_id=account.id, has_push=True) ctx = self._build_ctx(tool_ctx, account) + mode = getattr(params, "discovery_mode", None) + if mode is None: + mode_str = None + else: + mode_str = mode.value if hasattr(mode, "value") else str(mode) result = await _invoke_platform_method( self._platform, "get_signals", @@ -2491,6 +2569,12 @@ async def get_signals( # type: ignore[override] executor=self._executor, registry=self._registry, ) + # Post-dispatch discovery guards (mirror get_products): + # (d) hand-rolled submitted, (b) wholesale + handoff, + # (c) async + unresolved account. All INVALID_REQUEST / correctable. + reject_hand_rolled_submitted(result) + reject_wholesale_handoff(result, mode=mode_str, mode_field="discovery_mode") + assert_account_resolved_for_async(result, account_id=account.id, has_push=_has_push) self._maybe_auto_emit_sync_completion("get_signals", params, result) return cast("GetSignalsResponse", result) diff --git a/src/adcp/decisioning/specialisms/sales.py b/src/adcp/decisioning/specialisms/sales.py index 617a2ec83..c0114d0d7 100644 --- a/src/adcp/decisioning/specialisms/sales.py +++ b/src/adcp/decisioning/specialisms/sales.py @@ -45,7 +45,7 @@ if TYPE_CHECKING: from adcp.decisioning.context import RequestContext - from adcp.decisioning.types import MaybeAsync, SalesResult + from adcp.decisioning.types import DiscoveryResult, MaybeAsync, SalesResult # Wire types — auto-generated from schemas/cache/3.0.0/*.json. Adopters # import from ``adcp.types``; the Protocol uses string-name references @@ -112,8 +112,25 @@ def get_products( self, req: GetProductsRequest, ctx: RequestContext[TMeta], - ) -> MaybeAsync[GetProductsResponse]: - """Sync catalog read — no HITL even on broadcast/proposal-mode. + ) -> DiscoveryResult[GetProductsResponse]: + """Catalog discovery — synchronous by default, MAY hand off. + + Return :class:`GetProductsResponse` directly for the sync fast + path. Brief / refine discovery MAY hand off via + ``ctx.handoff_to_task(fn)`` when composing the catalog needs + background work (custom curation queue, slow proposal + generation); the framework projects the handoff to the wire + ``submitted`` envelope and the buyer polls ``tasks/get`` for the + terminal :class:`GetProductsResponse`. ``get_products`` exposes + the ``submitted`` / ``working`` / ``input_required`` async arms. + + Wholesale (``buying_mode='wholesale'``) MUST return synchronously + — it is a raw rate-card read with no seller-side composition to + background. A wholesale call that cannot finish within the + buyer's ``time_budget`` declares the gap via ``incomplete[]`` on + a sync response rather than handing off; returning a handoff from + a wholesale call is rejected at the framework layer with + ``AdcpError(INVALID_REQUEST, field='buying_mode')``. Brief-based proposal generation rides on a separate verb (``request_proposal``, adcp#3407); proposal-mode adopters diff --git a/src/adcp/decisioning/specialisms/signals.py b/src/adcp/decisioning/specialisms/signals.py index 635ce5150..077393fb6 100644 --- a/src/adcp/decisioning/specialisms/signals.py +++ b/src/adcp/decisioning/specialisms/signals.py @@ -8,7 +8,9 @@ Common method: -* :meth:`get_signals` — sync catalog discovery +* :meth:`get_signals` — catalog discovery; synchronous by default, + MAY hand off brief-driven discovery to a background task (submitted / + working arms only — no input_required) Marketplace-only method: @@ -34,7 +36,7 @@ if TYPE_CHECKING: from adcp.decisioning.context import RequestContext - from adcp.decisioning.types import MaybeAsync + from adcp.decisioning.types import DiscoveryResult, MaybeAsync from adcp.types import ( ActivateSignalRequest, ActivateSignalSuccessResponse, @@ -70,14 +72,31 @@ def get_signals( self, req: GetSignalsRequest, ctx: RequestContext[TMeta], - ) -> MaybeAsync[GetSignalsResponse]: + ) -> DiscoveryResult[GetSignalsResponse]: """Catalog discovery — query your signal index, return signals matching the buyer's filters (industry, intent type, audience size, etc.). - Sync at the wire level — :class:`GetSignalsResponse` has no - async envelope. Platforms with slow catalog stores need - internal caches. + Return :class:`GetSignalsResponse` directly for the sync fast + path. Brief-driven discovery MAY hand off via + ``ctx.handoff_to_task(fn)`` when provider discovery needs + background work (cross-provider fan-out, identity-graph lookups); + the framework projects the handoff to the wire ``submitted`` + envelope and the buyer polls ``tasks/get`` for the terminal + :class:`GetSignalsResponse`. + + Wholesale (``discovery_mode='wholesale'``) MUST return + synchronously — a raw catalog read with no seller-side + composition to background. Returning a handoff from a wholesale + call is rejected at the framework layer with + ``AdcpError(INVALID_REQUEST, field='discovery_mode')``. + + .. note:: + ``get_signals`` ships ONLY the ``submitted`` and ``working`` + async arms — it has **no** ``input_required`` arm (unlike + ``get_products`` and ``create_media_buy``). Signal discovery + cannot pause mid-task to solicit buyer clarification; the + seller either completes the discovery or fails the task. :raises adcp.decisioning.AdcpError: ``code='POLICY_VIOLATION'`` when the buyer doesn't have rights to the requested data diff --git a/src/adcp/decisioning/types.py b/src/adcp/decisioning/types.py index 7bd1aaf05..c38597f6d 100644 --- a/src/adcp/decisioning/types.py +++ b/src/adcp/decisioning/types.py @@ -364,6 +364,23 @@ def is_workflow_handoff(obj: Any) -> bool: type_params=(T,), ) +#: Hybrid sync-or-handoff result for the async discovery verbs +#: (``get_products`` / ``get_signals``). Identical arm set to +#: :data:`SalesResult` — return ``T`` directly for the synchronous +#: catalog read, or ``ctx.handoff_to_task(fn)`` for brief / refine work +#: the seller backgrounds (custom curation, identity-graph provider +#: discovery). Named distinctly from ``SalesResult`` so the discovery +#: Protocols read self-documenting at the call site even though the +#: underlying union is the same — coding agents and reviewers shouldn't +#: have to infer that a "sales" alias also governs signal discovery. The +#: framework projects the ``TaskHandoff`` to the wire ``submitted`` +#: envelope; the buyer polls ``tasks/get`` for the terminal artifact. +DiscoveryResult = TypeAliasType( + "DiscoveryResult", + "Awaitable[T] | T | TaskHandoff[T] | Awaitable[TaskHandoff[T]]", + type_params=(T,), +) + # --------------------------------------------------------------------------- # Account diff --git a/src/adcp/types/__init__.py b/src/adcp/types/__init__.py index 6d7363edf..9c539204d 100644 --- a/src/adcp/types/__init__.py +++ b/src/adcp/types/__init__.py @@ -566,13 +566,22 @@ GetMediaBuyArtifactsSuccessResponse, GetProductsBriefRequest, GetProductsField, + GetProductsInputRequiredResponse, GetProductsRefineRequest, + GetProductsResponseUnion, + GetProductsSubmittedResponse, + GetProductsSuccessResponse, GetProductsWholesaleRequest, + GetProductsWorkingResponse, GetRightsErrorResponse, GetRightsResponse1, GetRightsSuccessResponse, GetSignalsDiscoveryRequest, GetSignalsLookupRequest, + GetSignalsResponseUnion, + GetSignalsSubmittedResponse, + GetSignalsSuccessResponse, + GetSignalsWorkingResponse, GroupFormatAssetUnion, HtmlFormatAsset, HtmlFormatGroupAsset, @@ -1373,10 +1382,19 @@ def __init__(self, *args: object, **kwargs: object) -> None: "GetMediaBuyArtifactsResponse1", "GetMediaBuyArtifactsSuccessResponse", "GetProductsBriefRequest", + "GetProductsInputRequiredResponse", "GetProductsRefineRequest", + "GetProductsResponseUnion", + "GetProductsSubmittedResponse", + "GetProductsSuccessResponse", "GetProductsWholesaleRequest", + "GetProductsWorkingResponse", "GetSignalsDiscoveryRequest", "GetSignalsLookupRequest", + "GetSignalsResponseUnion", + "GetSignalsSubmittedResponse", + "GetSignalsSuccessResponse", + "GetSignalsWorkingResponse", "HtmlPreviewRender", "InlineDaastAsset", "InlineVastAsset", diff --git a/src/adcp/types/aliases.py b/src/adcp/types/aliases.py index 941d91a8d..532753ca1 100644 --- a/src/adcp/types/aliases.py +++ b/src/adcp/types/aliases.py @@ -104,6 +104,21 @@ from adcp.types._generated import ( BuildCreativeResponse6 as _BuildCreativeResponse6, ) +from adcp.types.generated_poc.core.async_response_refs.media_buy.get_products_async_response_input_required import ( # noqa: E501 + GetProductsInputRequired, +) +from adcp.types.generated_poc.core.async_response_refs.media_buy.get_products_async_response_submitted import ( # noqa: E501 + GetProductsSubmitted, +) +from adcp.types.generated_poc.core.async_response_refs.media_buy.get_products_async_response_working import ( # noqa: E501 + GetProductsWorking, +) +from adcp.types.generated_poc.core.async_response_refs.signals.get_signals_async_response_submitted import ( # noqa: E501 + GetSignalsSubmitted, +) +from adcp.types.generated_poc.core.async_response_refs.signals.get_signals_async_response_working import ( # noqa: E501 + GetSignalsWorking, +) from adcp.types.generated_poc.core.error import ( Recovery, Source, @@ -113,6 +128,12 @@ CreateMediaBuyResponse2, CreateMediaBuyResponse3, ) +from adcp.types.generated_poc.media_buy.get_products_response import ( + GetProductsResponse as _GetProductsSuccessResponse, +) +from adcp.types.generated_poc.signals.get_signals_response import ( + GetSignalsResponse as _GetSignalsSuccessResponse, +) def _generated_alias(name: str, fallback_name: str) -> Any: @@ -500,6 +521,69 @@ def _generated_alias(name: str, fallback_name: str) -> Any: ``active``). """ +# Get Products Response Variants +# +# The rc.9 ``get_products_response`` schema is a single flat success +# object — the async arms (submitted / working / input_required) ship as +# separate ``core/async_response_refs`` schemas, NOT unioned into the +# top-level response. So ``GetProductsResponse`` (the public name) stays +# the constructable success class; these aliases name the orphaned async +# arms semantically, mirroring ``CreateMediaBuySubmittedResponse`` even +# though create_media_buy's submitted arm IS unioned into its generated +# response (its schema is a oneOf, get_products' is not). +GetProductsSuccessResponse: TypeAlias = _GetProductsSuccessResponse +"""Success response - product catalog issued in-line (synchronous read).""" + +GetProductsSubmittedResponse: TypeAlias = GetProductsSubmitted +"""Submitted (async) envelope - brief / refine discovery handed off to a +background task. Carries ``task_id`` + ``status='submitted'``; the +``products`` array is issued on the completion artifact (poll +``tasks/get``), not here. Wholesale calls MUST NOT produce this arm.""" + +GetProductsWorkingResponse: TypeAlias = GetProductsWorking +"""Working (async) progress envelope for an in-flight get_products task.""" + +GetProductsInputRequiredResponse: TypeAlias = GetProductsInputRequired +"""Input-required (async) envelope - the seller paused discovery to +solicit buyer clarification (CLARIFICATION_NEEDED / BUDGET_REQUIRED).""" + +#: Full async-aware union for ``get_products``. Includes the synchronous +#: success arm plus the three async arms the rc.9 spec ships for this +#: verb. The public ``GetProductsResponse`` name remains the success +#: class (so direct construction / ``model_validate`` keep working); +#: this union is the honest type of "any get_products response shape on +#: the wire," used by callers that pattern-match across sync and async. +GetProductsResponseUnion: TypeAlias = ( + _GetProductsSuccessResponse + | GetProductsSubmitted + | GetProductsWorking + | GetProductsInputRequired +) + +# Get Signals Response Variants +# +# get_signals ships ONLY submitted + working async arms — NO +# input_required (signal discovery cannot pause to solicit buyer input). +# Same flat-success-schema posture as get_products. +GetSignalsSuccessResponse: TypeAlias = _GetSignalsSuccessResponse +"""Success response - signal catalog issued in-line (synchronous read).""" + +GetSignalsSubmittedResponse: TypeAlias = GetSignalsSubmitted +"""Submitted (async) envelope - brief discovery handed off to a +background task. Carries ``task_id`` + ``status='submitted'``; the +``signals`` array is issued on the completion artifact. Wholesale calls +MUST NOT produce this arm.""" + +GetSignalsWorkingResponse: TypeAlias = GetSignalsWorking +"""Working (async) progress envelope for an in-flight get_signals task.""" + +#: Full async-aware union for ``get_signals``. Includes the synchronous +#: success arm plus the two async arms (submitted / working). Has NO +#: input_required arm — narrower than ``GetProductsResponseUnion``. +GetSignalsResponseUnion: TypeAlias = ( + _GetSignalsSuccessResponse | GetSignalsSubmitted | GetSignalsWorking +) + # Performance Feedback Response Variants ProvidePerformanceFeedbackSuccessResponse: TypeAlias = ProvidePerformanceFeedbackResponse1 """Success response - performance feedback accepted.""" @@ -1907,6 +1991,17 @@ class UnknownGroupAsset(_BaseGroupAsset): # Get signals request variants "GetSignalsDiscoveryRequest", "GetSignalsLookupRequest", + # Get products response variants (async discovery) + "GetProductsSuccessResponse", + "GetProductsSubmittedResponse", + "GetProductsWorkingResponse", + "GetProductsInputRequiredResponse", + "GetProductsResponseUnion", + # Get signals response variants (async discovery) + "GetSignalsSuccessResponse", + "GetSignalsSubmittedResponse", + "GetSignalsWorkingResponse", + "GetSignalsResponseUnion", # Performance feedback request variants "ProvidePerformanceFeedbackByMediaBuyRequest", "ProvidePerformanceFeedbackByBuyerRefRequest", diff --git a/tests/fixtures/public_api_snapshot.json b/tests/fixtures/public_api_snapshot.json index 53dec2f7c..329152755 100644 --- a/tests/fixtures/public_api_snapshot.json +++ b/tests/fixtures/public_api_snapshot.json @@ -759,10 +759,15 @@ "GetPlanAuditLogsResponse", "GetProductsBriefRequest", "GetProductsField", + "GetProductsInputRequiredResponse", "GetProductsRefineRequest", "GetProductsRequest", "GetProductsResponse", + "GetProductsResponseUnion", + "GetProductsSubmittedResponse", + "GetProductsSuccessResponse", "GetProductsWholesaleRequest", + "GetProductsWorkingResponse", "GetPropertyListRequest", "GetPropertyListResponse", "GetRightsErrorResponse", @@ -774,6 +779,10 @@ "GetSignalsLookupRequest", "GetSignalsRequest", "GetSignalsResponse", + "GetSignalsResponseUnion", + "GetSignalsSubmittedResponse", + "GetSignalsSuccessResponse", + "GetSignalsWorkingResponse", "GetTaskStatusRequest", "GetTaskStatusResponse", "GovernanceAgent", diff --git a/tests/test_decisioning_async_discovery.py b/tests/test_decisioning_async_discovery.py new file mode 100644 index 000000000..57e23e985 --- /dev/null +++ b/tests/test_decisioning_async_discovery.py @@ -0,0 +1,671 @@ +"""Async (handoff) discovery for get_products / get_signals — issue #924. + +Tests the typed-API + dispatch contract for promoting ``get_products`` and +``get_signals`` to long-running tasks via ``ctx.handoff_to_task(fn)``, plus +the four rejection guards that keep async discovery spec-conformant. + +Tests exercise the public API (the ``PlatformHandler`` shim and the +``adcp.types`` / ``adcp.decisioning`` exports) and validate against the real +generated Pydantic models — never against handler internals. + +Reference: JS adcp-client#2170 (rc8 async discovery parity). +""" + +from __future__ import annotations + +import asyncio +import typing +from concurrent.futures import ThreadPoolExecutor +from typing import Any + +import pytest + +from adcp.decisioning import ( + AdcpError, + DecisioningCapabilities, + DecisioningPlatform, + InMemoryTaskRegistry, + SingletonAccounts, +) +from adcp.decisioning.handler import PlatformHandler +from adcp.decisioning.types import Account +from adcp.server.base import ToolContext + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test-discovery-") + yield pool + pool.shutdown(wait=True) + + +def _make_handler( + platform: DecisioningPlatform, + executor: ThreadPoolExecutor, + registry: InMemoryTaskRegistry | None = None, +) -> PlatformHandler: + return PlatformHandler( + platform, + executor=executor, + registry=registry or InMemoryTaskRegistry(), + ) + + +class _UnresolvedAccounts: + """AccountStore that resolves every request to the ``''`` + sentinel id — models the accountless/derived-miss case for guard (c).""" + + def resolve(self, ref: Any = None, auth_info: Any = None) -> Account[Any]: + del ref, auth_info + return Account(id="") + + +# --------------------------------------------------------------------------- +# Response-union membership (typed API surface) +# --------------------------------------------------------------------------- + + +def test_get_products_union_includes_all_three_async_arms() -> None: + """``GetProductsResponseUnion`` carries the sync success arm plus the + submitted / working / input_required async arms the rc.9 spec ships.""" + from adcp.types import ( + GetProductsInputRequiredResponse, + GetProductsResponse, + GetProductsResponseUnion, + GetProductsSubmittedResponse, + GetProductsWorkingResponse, + ) + + arms = set(typing.get_args(GetProductsResponseUnion)) + assert GetProductsResponse in arms + assert GetProductsSubmittedResponse in arms + assert GetProductsWorkingResponse in arms + assert GetProductsInputRequiredResponse in arms + + +def test_get_signals_union_has_submitted_working_but_not_input_required() -> None: + """``get_signals`` ships ONLY submitted + working — no input_required + arm (signal discovery cannot pause to solicit buyer clarification).""" + from adcp.types import ( + GetSignalsResponse, + GetSignalsResponseUnion, + GetSignalsSubmittedResponse, + GetSignalsWorkingResponse, + ) + + arms = set(typing.get_args(GetSignalsResponseUnion)) + assert GetSignalsResponse in arms + assert GetSignalsSubmittedResponse in arms + assert GetSignalsWorkingResponse in arms + # No input_required arm exists for signals at all. + import adcp.types as adcp_types + + assert not hasattr(adcp_types, "GetSignalsInputRequiredResponse") + assert not hasattr(adcp_types, "GetSignalsInputRequired") + arm_names = {a.__name__ for a in arms} + assert not any("InputRequired" in n for n in arm_names) + + +def test_input_required_arm_exists_for_products_only() -> None: + """get_products has an input_required arm; get_signals does not.""" + from adcp.types import GetProductsInputRequiredResponse + + # Reason enum on the products input_required arm is real. + assert "reason" in GetProductsInputRequiredResponse.model_fields + + +def test_submitted_arms_validate_the_wire_submitted_envelope() -> None: + """The submitted arm classes accept the {task_id, status='submitted'} + wire shape via real .model_validate().""" + from adcp.types import GetProductsSubmittedResponse, GetSignalsSubmittedResponse + + p = GetProductsSubmittedResponse.model_validate({"status": "submitted", "task_id": "task_p1"}) + assert p.status == "submitted" + assert p.task_id == "task_p1" + + s = GetSignalsSubmittedResponse.model_validate({"status": "submitted", "task_id": "task_s1"}) + assert s.status == "submitted" + assert s.task_id == "task_s1" + + +def test_discovery_result_alias_mirrors_sales_result() -> None: + """DiscoveryResult[T] has the same arm structure as SalesResult[T].""" + from adcp.decisioning import DiscoveryResult, SalesResult + + # Both are TypeAliasType — same underlying union string. + assert DiscoveryResult.__value__ == SalesResult.__value__ + + +# --------------------------------------------------------------------------- +# Async handoff — submitted envelope on submit, terminal result on completion +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_handoff_returns_submitted_envelope(executor) -> None: + """Adopter returns ctx.handoff_to_task(fn) → handler returns the wire + Submitted envelope ({task_id, status}), NOT a GetProductsResponse.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + async def _curate(task_ctx): + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + handler = _make_handler(_Platform(), executor) + result = await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="ctv inventory"), + ToolContext(), + ) + assert isinstance(result, dict) + assert result["status"] == "submitted" + assert result["task_id"].startswith("task_") + assert set(result.keys()) == {"task_id", "status"} + + +@pytest.mark.asyncio +async def test_get_products_handoff_completion_lands_in_registry(executor) -> None: + """The background task completes the registry row with task_type + 'get_products' and the terminal GetProductsResponse artifact (the + shape a buyer reads via tasks/get).""" + from adcp.types import GetProductsRequest, GetProductsResponse + + completed = asyncio.Event() + + async def _curate(task_ctx): + completed.set() + # Empty products list is wire-valid; the terminal artifact shape is + # what matters for the registry-completion assertion. + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + registry = InMemoryTaskRegistry() + handler = _make_handler(_Platform(), executor, registry=registry) + envelope = await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="ctv inventory"), + ToolContext(), + ) + await asyncio.wait_for(completed.wait(), timeout=2.0) + await asyncio.sleep(0.05) + + rec = await registry.get(envelope["task_id"]) + assert rec is not None + assert rec["state"] == "completed" + assert rec["task_type"] == "get_products" + assert "products" in rec["result"] + + +@pytest.mark.asyncio +async def test_get_signals_handoff_returns_submitted_envelope(executor) -> None: + from adcp.types import GetSignalsRequest, GetSignalsResponse + + async def _discover(task_ctx): + return GetSignalsResponse(signals=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return ctx.handoff_to_task(_discover) + + handler = _make_handler(_Platform(), executor) + result = await handler.get_signals( + GetSignalsRequest(discovery_mode="brief", signal_spec="auto intenders"), + ToolContext(), + ) + assert isinstance(result, dict) + assert result["status"] == "submitted" + assert result["task_id"].startswith("task_") + + +@pytest.mark.asyncio +async def test_get_signals_handoff_completion_lands_in_registry(executor) -> None: + from adcp.types import GetSignalsRequest, GetSignalsResponse + + completed = asyncio.Event() + + async def _discover(task_ctx): + completed.set() + return GetSignalsResponse(signals=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return ctx.handoff_to_task(_discover) + + registry = InMemoryTaskRegistry() + handler = _make_handler(_Platform(), executor, registry=registry) + envelope = await handler.get_signals( + GetSignalsRequest(discovery_mode="brief", signal_spec="auto intenders"), + ToolContext(), + ) + await asyncio.wait_for(completed.wait(), timeout=2.0) + await asyncio.sleep(0.05) + + rec = await registry.get(envelope["task_id"]) + assert rec is not None + assert rec["state"] == "completed" + assert rec["task_type"] == "get_signals" + + +# --------------------------------------------------------------------------- +# Sync path unchanged +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_sync_path_no_task_id(executor) -> None: + """A sync GetProductsResponse return has no task_id and is returned as + the typed model (no behavior change).""" + from adcp.types import GetProductsRequest, GetProductsResponse + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return GetProductsResponse(products=[]) + + handler = _make_handler(_Platform(), executor) + resp = await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="x"), + ToolContext(), + ) + assert isinstance(resp, GetProductsResponse) + assert not hasattr(resp, "task_id") or resp.task_id is None # type: ignore[attr-defined] + assert resp.products == [] + + +@pytest.mark.asyncio +async def test_get_signals_sync_path_no_task_id(executor) -> None: + from adcp.types import GetSignalsRequest, GetSignalsResponse + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return GetSignalsResponse(signals=[]) + + handler = _make_handler(_Platform(), executor) + resp = await handler.get_signals( + GetSignalsRequest(discovery_mode="brief", signal_spec="x"), + ToolContext(), + ) + assert isinstance(resp, GetSignalsResponse) + + +class _RecordingStore: + """Minimal ProposalStore that records put_draft calls.""" + + def __init__(self) -> None: + self.put_calls: list[dict[str, Any]] = [] + + async def put_draft( + self, + *, + proposal_id: str, + account_id: str, + recipes: Any, + proposal_payload: Any, + ) -> None: + self.put_calls.append({"proposal_id": proposal_id, "account_id": account_id}) + + +class _StoreBackedAccounts: + """Resolves a tenant-scoped account carrying ``tenant_id`` in metadata + so the proposal-store resolver finds the wired store.""" + + def resolve(self, ref: Any = None, auth_info: Any = None) -> Account[Any]: + del ref, auth_info + return Account(id="seller:acct_1", metadata={"tenant_id": "default"}) + + +def _store_backed_platform(store: _RecordingStore, handoff: bool): + from adcp.types import GetProductsResponse + + _proposals = [ + { + "proposal_id": "prop_1", + "name": "plan", + "allocations": [{"product_id": "p1", "allocation_percentage": 100}], + } + ] + _products: list[Any] = [] + + async def _curate(task_ctx): + return GetProductsResponse(products=_products, proposals=_proposals) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = _StoreBackedAccounts() + + def proposal_store_for_tenant(self, tenant_id: str): + return store + + async def get_products(self, req, ctx): + if handoff: + return ctx.handoff_to_task(_curate) + return GetProductsResponse(products=_products, proposals=_proposals) + + return _Platform() + + +@pytest.mark.asyncio +async def test_get_products_sync_draft_persist_still_fires(executor) -> None: + """The persist-draft terminal side-effect runs on the sync completion + path (threaded as on_complete) — proposals reach the wired store.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + store = _RecordingStore() + handler = _make_handler(_store_backed_platform(store, handoff=False), executor) + resp = await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="x"), + ToolContext(), + ) + assert isinstance(resp, GetProductsResponse) + assert [c["proposal_id"] for c in store.put_calls] == ["prop_1"] + + +@pytest.mark.asyncio +async def test_get_products_handoff_draft_persist_runs_on_completion(executor) -> None: + """The persist-draft side-effect runs on the handoff COMPLETION path — + threaded as on_complete so it fires when the bg task lands, not at + submit time.""" + from adcp.types import GetProductsRequest + + store = _RecordingStore() + registry = InMemoryTaskRegistry() + platform = _store_backed_platform(store, handoff=True) + handler = _make_handler(platform, executor, registry=registry) + envelope = await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="x"), + ToolContext(), + ) + # At submit time the side-effect has NOT run yet. + assert isinstance(envelope, dict) and envelope["status"] == "submitted" + # Drain the background task. + for _ in range(20): + await asyncio.sleep(0.01) + if store.put_calls: + break + assert [c["proposal_id"] for c in store.put_calls] == [ + "prop_1" + ], "persist-draft on_complete hook did not fire on the handoff completion path" + rec = await registry.get(envelope["task_id"]) + assert rec is not None and rec["state"] == "completed" + + +# --------------------------------------------------------------------------- +# Guard (a): wholesale + push_notification_config — pre-dispatch reject +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_wholesale_push_rejected_predispatch(executor) -> None: + from adcp.types import GetProductsRequest + + call_count = {"n": 0} + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + call_count["n"] += 1 + raise AssertionError("platform method must not be invoked") + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest( + buying_mode="wholesale", + push_notification_config={"url": "https://buyer.example.com/wh"}, + ), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "push_notification_config" + assert call_count["n"] == 0 + + +@pytest.mark.asyncio +async def test_get_signals_wholesale_push_rejected_predispatch(executor) -> None: + from adcp.types import GetSignalsRequest + + call_count = {"n": 0} + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + call_count["n"] += 1 + raise AssertionError("platform method must not be invoked") + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_signals( + GetSignalsRequest( + discovery_mode="wholesale", + push_notification_config={"url": "https://buyer.example.com/wh"}, + ), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "push_notification_config" + assert call_count["n"] == 0 + + +# --------------------------------------------------------------------------- +# Guard (b): wholesale + adopter handoff — post-dispatch reject +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_wholesale_handoff_rejected(executor) -> None: + from adcp.types import GetProductsRequest + + async def _curate(task_ctx): + return {"products": []} + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest(buying_mode="wholesale"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "buying_mode" + + +@pytest.mark.asyncio +async def test_get_signals_wholesale_handoff_rejected(executor) -> None: + from adcp.types import GetSignalsRequest + + async def _discover(task_ctx): + return {"signals": []} + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return ctx.handoff_to_task(_discover) + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_signals( + GetSignalsRequest(discovery_mode="wholesale"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "discovery_mode" + + +# --------------------------------------------------------------------------- +# Guard (c): async + unresolved account — field='account' +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_push_unresolved_account_rejected(executor) -> None: + """brief + push_notification_config against an unresolved (sentinel) + account is rejected with field='account' before dispatch.""" + from adcp.types import GetProductsRequest + + call_count = {"n": 0} + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = _UnresolvedAccounts() + + async def get_products(self, req, ctx): + call_count["n"] += 1 + from adcp.types import GetProductsResponse + + return GetProductsResponse(products=[]) + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest( + buying_mode="brief", + brief="x", + push_notification_config={"url": "https://buyer.example.com/wh"}, + ), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "account" + assert call_count["n"] == 0 + + +@pytest.mark.asyncio +async def test_get_signals_push_unresolved_account_rejected(executor) -> None: + from adcp.types import GetSignalsRequest + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = _UnresolvedAccounts() + + async def get_signals(self, req, ctx): + from adcp.types import GetSignalsResponse + + return GetSignalsResponse(signals=[]) + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_signals( + GetSignalsRequest( + discovery_mode="brief", + signal_spec="x", + push_notification_config={"url": "https://buyer.example.com/wh"}, + ), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + assert exc.value.field == "account" + + +# --------------------------------------------------------------------------- +# Guard (d): hand-rolled submitted dict from the sync arm +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_hand_rolled_submitted_rejected(executor) -> None: + """Adopter returns a literal {'status':'submitted', ...} with extra + keys (bypassing ctx.handoff_to_task) → guiding INVALID_REQUEST.""" + from adcp.types import GetProductsRequest + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return {"status": "submitted", "task_id": "hand_rolled_1", "products": []} + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="x"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + + +@pytest.mark.asyncio +async def test_get_signals_hand_rolled_submitted_rejected(executor) -> None: + from adcp.types import GetSignalsRequest + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return {"status": "submitted", "task_id": "hand_rolled_2", "signals": []} + + handler = _make_handler(_Platform(), executor) + with pytest.raises(AdcpError) as exc: + await handler.get_signals( + GetSignalsRequest(discovery_mode="brief", signal_spec="x"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.recovery == "correctable" + + +# --------------------------------------------------------------------------- +# task-type enum + wire validation +# --------------------------------------------------------------------------- + + +def test_task_type_enum_includes_discovery_verbs() -> None: + """The generated TaskType enum (which validates tasks_get / list_tasks / + webhook task_type) carries both discovery verbs.""" + from adcp.types.generated_poc.enums.task_type import TaskType + + values = {t.value for t in TaskType} + assert "get_products" in values + assert "get_signals" in values + + +def test_wire_validator_accepts_submitted_for_discovery_verbs() -> None: + """Strict response validation selects the submitted variant and passes + the {task_id, status='submitted'} envelope for both discovery verbs.""" + from adcp.validation.schema_validator import validate_response + + for tool in ("get_products", "get_signals"): + outcome = validate_response(tool, {"task_id": "task_x", "status": "submitted"}) + # Either the submitted variant validates clean, or the bundle is + # absent (skipped). It must NOT report a hard schema failure. + assert outcome.valid, f"{tool} submitted envelope failed validation: {outcome.issues}" From f8d421eddb8abb11e15669a39bf53e63e504fedd Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sun, 7 Jun 2026 05:14:50 -0400 Subject: [PATCH 2/2] feat(decisioning): framework-wide async-completion webhooks + #930 review fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deliver the spec-required terminal completion / failure webhook on the async (handoff) path of every spec-eligible verb when the buyer registered push_notification_config (adcp#5389). Previously the SDK emitted only on the sync path; create_media_buy / get_products / get_signals and all other async ops left a registered push URL silent and the buyer polling tasks/get. Seam (approach a): emit centrally from the background completion path in dispatch._project_handoff — the single seam every async task flows through (_invoke_platform_method for all verbs, plus proposal finalize). The terminal webhook fires EXACTLY ONCE after registry.complete / registry.fail, with the buyer's operation_id echoed verbatim and the registry task_id included. The sync auto-emit gate already skips the {task_id, status} submitted projection, so the two paths never double-deliver. Webhook emission lives in the decisioning runtime (webhook_emit.emit_terminal_completion_webhook), reusing the sync path's WebhookSender / supervisor and send_mcp payload builder; the pluggable TaskRegistry stays webhook-agnostic. Gated by auto_emit_completion_webhooks so manual-emit adopters are unaffected; no change to the sync defaults. - webhook_emit.py: emit_terminal_completion_webhook (self-isolating, logged-and-swallowed) + operation_id extraction from push config. - dispatch.py: thread webhook_target / webhook_auto_emit through _invoke_platform_method -> _project_handoff; fire on both the registry.complete (completed) and _fail (failed) terminal arms. - handler.py: _handoff_webhook_kwargs() threaded into every spec-eligible shim (create/update_media_buy, sync_creatives, get/activate_signal, get_products, sync_audiences/catalogs, brand/rights, property_list). - webhook_supervisor{,_pg}.py: add operation_id to send_mcp; Pg persists it on the queue row (CREATE column + ADD COLUMN IF NOT EXISTS backfill) and replays it from the worker. Review fixes on #930: - MUST-FIX docstrings: corrected the now-true claims — async terminal completion delivers via push webhook when configured, always via tasks/get polling (handler / serve / specialism docstrings). - MUST-FIX dead guard arm (c): removed the unreachable post-dispatch assert_account_resolved_for_async call; documented that compose_caller_identity owns the no-push-handoff + unresolved-account case (fails closed terminally at _build_ctx before any task is minted). Added a test asserting no registry row is issued. - SHOULD-FIX side-effect leak (b): reject_wholesale_handoff_before_launch wired as a pre_handoff_reject callback so a wholesale handoff is rejected BEFORE the registry row / background task / draft / webhook -- no side effects for a buyer told 'rejected'. Test asserts an empty registry after rejection. - SHOULD-FIX persist-draft: regression test pinning that fields= / pagination= projections shape only the wire response, never the persisted draft (full product pricing retained). - SHOULD-FIX wire-validator: tightened the discovery submitted test to assert variant == 'submitted' (no skip escape hatch). - Added async-completion webhook tests on get_products, get_signals, create_media_buy (one completed webhook; operation_id echoed, task_id, result in payload), the failure path (one failed webhook), and the no-push no-webhook case. Updated the create_media_buy handoff test to assert the conformant exactly-once behavior. No ADCP_VERSION bump, no schema re-download, no public-API export change. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/decisioning/discovery_guards.py | 81 +++- src/adcp/decisioning/dispatch.py | 98 ++++- src/adcp/decisioning/handler.py | 121 +++++- src/adcp/decisioning/serve.py | 15 +- src/adcp/decisioning/specialisms/sales.py | 10 +- src/adcp/decisioning/specialisms/signals.py | 7 +- src/adcp/decisioning/webhook_emit.py | 163 +++++++ src/adcp/webhook_supervisor.py | 9 + src/adcp/webhook_supervisor_pg.py | 23 +- tests/test_decisioning_async_discovery.py | 455 +++++++++++++++++++- tests/test_decisioning_webhook_emit.py | 39 +- tests/test_webhook_supervisor.py | 19 + tests/test_webhook_supervisor_pg.py | 48 ++- 13 files changed, 1015 insertions(+), 73 deletions(-) diff --git a/src/adcp/decisioning/discovery_guards.py b/src/adcp/decisioning/discovery_guards.py index 8240cd6eb..5ea7c267d 100644 --- a/src/adcp/decisioning/discovery_guards.py +++ b/src/adcp/decisioning/discovery_guards.py @@ -114,11 +114,52 @@ def assert_discovery_push_consistent(req: Any, *, mode_field: str) -> None: ) +def _wholesale_handoff_error(mode_field: str) -> AdcpError: + """Build the guard (b) rejection. Shared by the pre-dispatch callback + and the post-dispatch belt-and-braces check so both emit an identical + diagnostic.""" + return AdcpError( + "INVALID_REQUEST", + message=( + f"{mode_field}='wholesale' returned a task handoff, but " + "wholesale discovery MUST complete synchronously. Return the " + "catalog directly; signal partial completion via the " + "response's incomplete[] field rather than handing off to a " + "background task. The 'brief' mode is the async-capable path." + ), + field=mode_field, + recovery="correctable", + ) + + +def reject_wholesale_handoff_before_launch(mode_field: str) -> None: + """Guard (b) pre-dispatch arm: reject a wholesale handoff BEFORE the + framework mints a registry row or launches the background task. + + Wired as the ``pre_handoff_reject`` callback on + :func:`adcp.decisioning.dispatch._invoke_platform_method` only when the + resolved mode is wholesale. The dispatcher calls it the instant it + detects the adapter returned a :class:`TaskHandoff` — before + ``_project_handoff`` issues a task or starts background work — so a + rejected wholesale handoff leaves NO task row, NO persisted draft, NO + background coroutine, and NO completion webhook. Without this, the + post-dispatch check below would fire only after those side effects had + already happened. + + :raises AdcpError: ``INVALID_REQUEST`` / ``correctable``, + ``field=``. + """ + raise _wholesale_handoff_error(mode_field) + + def reject_wholesale_handoff(result: Any, *, mode: str | None, mode_field: str) -> None: - """Guard (b): post-dispatch reject of an adopter handoff on a wholesale call. + """Guard (b) post-dispatch arm: belt-and-braces reject of an adopter + handoff on a wholesale call. + Defense-in-depth for :func:`reject_wholesale_handoff_before_launch`. Called after ``_invoke_platform_method`` with the (already projected) - result. When the resolved mode is wholesale and the result is the + result; covers any dispatch path that did not wire the pre-launch + callback. When the resolved mode is wholesale and the result is the framework's submitted projection, the adopter's wholesale code path handed off — a contract violation. Reject (``field=``). @@ -126,18 +167,7 @@ def reject_wholesale_handoff(result: Any, *, mode: str | None, mode_field: str) :raises AdcpError: ``INVALID_REQUEST`` / ``correctable``. """ if mode == "wholesale" and _is_submitted_projection(result): - raise AdcpError( - "INVALID_REQUEST", - message=( - f"{mode_field}='wholesale' returned a task handoff, but " - "wholesale discovery MUST complete synchronously. Return the " - "catalog directly; signal partial completion via the " - "response's incomplete[] field rather than handing off to a " - "background task. The 'brief' mode is the async-capable path." - ), - field=mode_field, - recovery="correctable", - ) + raise _wholesale_handoff_error(mode_field) def assert_account_resolved_for_async( @@ -156,13 +186,21 @@ def assert_account_resolved_for_async( :func:`adcp.decisioning.dispatch.compose_caller_identity`'s contract across derived / implicit / explicit account modes. - Call this both pre-dispatch (with ``result=None``) when the buyer - supplied ``push_notification_config`` — so the rejection fires as a - clean ``INVALID_REQUEST`` before any platform / registry interaction — - and post-dispatch (passing the projected ``result``) so an adopter - handoff against an unresolved account is also caught. The framework's - registry independently rejects an empty ``account_id`` at issue-time; - this guard surfaces the buyer-facing ``field='account'`` diagnostic. + Used pre-dispatch (with ``result=None``) when the buyer supplied + ``push_notification_config`` — so the rejection fires as a clean + ``INVALID_REQUEST`` / ``correctable`` with the buyer-facing + ``field='account'`` diagnostic BEFORE any platform / registry + interaction. + + The no-push handoff case (the adopter hands off against an unresolved + account without a push config) is NOT routed through this guard: + :func:`adcp.decisioning.dispatch.compose_caller_identity` already fails + closed at ``_build_ctx`` — which runs BEFORE the platform method — with + a terminal ``INVALID_REQUEST``, so no task row is ever minted against an + unresolved account. A post-dispatch call here passing the projected + ``result`` would be unreachable for an unresolved account (control never + reaches it because ``_build_ctx`` raised first) and a no-op for a + resolved one, so the shims do not make that call. :raises AdcpError: ``INVALID_REQUEST`` / ``correctable`` with ``field='account'``. @@ -245,4 +283,5 @@ def reject_hand_rolled_submitted(result: Any) -> None: "assert_discovery_push_consistent", "reject_hand_rolled_submitted", "reject_wholesale_handoff", + "reject_wholesale_handoff_before_launch", ] diff --git a/src/adcp/decisioning/dispatch.py b/src/adcp/decisioning/dispatch.py index 4b710aafe..5ccc9331a 100644 --- a/src/adcp/decisioning/dispatch.py +++ b/src/adcp/decisioning/dispatch.py @@ -66,6 +66,7 @@ is_task_handoff, is_workflow_handoff, ) +from adcp.decisioning.webhook_emit import emit_terminal_completion_webhook if TYPE_CHECKING: from collections.abc import Awaitable, Callable @@ -77,6 +78,10 @@ from adcp.decisioning.registry import BuyerAgent from adcp.decisioning.types import Account from adcp.server.base import ToolContext + from adcp.webhook_sender import WebhookSender + from adcp.webhook_supervisor import WebhookDeliverySupervisor + + WebhookDeliveryTarget = WebhookSender | WebhookDeliverySupervisor logger = logging.getLogger(__name__) @@ -1305,6 +1310,9 @@ async def _invoke_platform_method( extra_kwargs: dict[str, Any] | None = None, on_complete: Callable[[Any], Awaitable[None]] | None = None, on_failure: Callable[[BaseException], Awaitable[None]] | None = None, + webhook_target: WebhookDeliveryTarget | None = None, + webhook_auto_emit: bool = True, + pre_handoff_reject: Callable[[], None] | None = None, ) -> Any: """Invoke a platform method, projecting hybrid returns. @@ -1351,6 +1359,30 @@ async def _invoke_platform_method( Symmetric with ``on_complete``. Used by v1.5 create_media_buy to release the consumption reservation so the buyer can retry. Hook errors are logged but never block exception propagation. + + :param webhook_target: Forwarded to :func:`_project_handoff` so the + background completion path can deliver the terminal completion / + failure webhook when the buyer registered + ``push_notification_config``. The handler wires its + ``webhook_sender`` / ``webhook_supervisor``; only the handoff + (async) arm uses it — the sync arm's auto-emit is a separate + call in the handler shim. + + :param webhook_auto_emit: Forwarded to :func:`_project_handoff`; + mirrors the handler's ``auto_emit_completion_webhooks`` so an + adopter emitting webhooks manually never gets a framework + double-delivery on the handoff path. + + :param pre_handoff_reject: Optional zero-arg callback invoked when + the adapter returned a :class:`TaskHandoff`, BEFORE + :func:`_project_handoff` mints a registry row or launches the + background task. Raising from it (e.g. an :class:`AdcpError`) + rejects the handoff with NO side effects — no task row, no + background work, no completion webhook. The discovery + wholesale-is-synchronous guard uses this so an adopter handing + off on a ``wholesale`` request is rejected cleanly instead of + leaking a task the buyer was told was rejected. Runs only on the + ``TaskHandoff`` arm; sync / workflow-handoff returns ignore it. """ # pydantic is a required dep; import here (not at module level) to mirror # the lazy-import discipline used throughout this module. @@ -1513,6 +1545,13 @@ async def _invoke_platform_method( raise wrapped from exc if is_task_handoff(result): + # Reject before any side effect (registry row, background task, + # completion webhook) is created. The wholesale discovery guard + # uses this so an adopter handing off on a synchronous-only + # wholesale request never leaks a task the buyer is told is + # rejected. + if pre_handoff_reject is not None: + pre_handoff_reject() return await _project_handoff( result, ctx, @@ -1522,6 +1561,8 @@ async def _invoke_platform_method( on_complete=on_complete, on_failure=on_failure, request_params=params, + webhook_target=webhook_target, + webhook_auto_emit=webhook_auto_emit, ) if is_workflow_handoff(result): return await _project_workflow_handoff( @@ -1588,6 +1629,8 @@ async def _project_handoff( on_complete: Callable[[Any], Awaitable[None]] | None = None, on_failure: Callable[[BaseException], Awaitable[None]] | None = None, request_params: BaseModel | None = None, + webhook_target: WebhookDeliveryTarget | None = None, + webhook_auto_emit: bool = True, ) -> dict[str, Any]: """Promote a TaskHandoff to a background task. @@ -1647,7 +1690,26 @@ async def _project_handoff( (``registry.fail``) paths — closes #563. Mirrors the sync AdcpError path's context-passthrough (PR #560). When ``None``, no echo happens (e.g. test fixtures invoking the handoff - helper directly). + helper directly). Also the source of the buyer's + ``push_notification_config`` (url / token / operation_id) for + the terminal-completion webhook. + + :param webhook_target: The wired :class:`~adcp.webhook_sender.WebhookSender` + or :class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor`. When + the buyer supplied ``push_notification_config`` on + ``request_params``, the background completion path emits the + terminal completion / failure webhook to that URL EXACTLY ONCE — + on success after ``registry.complete``, on failure after + ``registry.fail``. This is the async-path half of the spec + webhook contract (adcp#5389): a ``Submitted`` task carrying a + push config MUST deliver at least the terminal notification. + ``None`` (and the no-push case) skips delivery — the buyer polls + ``tasks/get`` instead. The framework's polling path is unchanged. + + :param webhook_auto_emit: Mirrors the handler's + ``auto_emit_completion_webhooks`` flag. When ``False`` the + adopter emits webhooks manually inside their handler; the + framework skips the terminal emission so it never double-delivers. The handoff fn is extracted via the type-identity dispatch in :func:`adcp.decisioning.types.is_task_handoff`. Subclassed @@ -1699,7 +1761,23 @@ async def _fail(exc: AdcpError) -> None: "still recorded in the registry", task_id, ) - await registry.fail(task_id, exc.to_wire()) + error_wire = exc.to_wire() + await registry.fail(task_id, error_wire) + # Terminal failure webhook (spec MUST when push config present). + # Fired AFTER registry.fail so the buyer's tasks/get poll and the + # push notification observe the same terminal state. The error + # wire dict is the payload `result`; the helper is self-isolating + # (logged-and-swallowed) so a delivery failure never re-raises into + # the background task. + await emit_terminal_completion_webhook( + target=webhook_target, + enabled=webhook_auto_emit, + method_name=method_name, + params=request_params, + status="failed", + task_id=task_id, + result=error_wire, + ) async def _run() -> None: try: @@ -1792,6 +1870,22 @@ async def _run() -> None: # surfaces it under the top-level ``context`` key; nothing to # do here on the result path. await registry.complete(task_id, persisted) + # Terminal completion webhook (spec MUST when push config present). + # Fired AFTER registry.complete so the buyer's tasks/get poll and + # the push notification observe the same terminal artifact. EXACTLY + # ONCE — the sync auto-emit gate skips the {task_id, status} + # submitted projection, so the handoff path owns the completion + # notification end-to-end. The helper is self-isolating + # (logged-and-swallowed); a delivery failure never re-raises here. + await emit_terminal_completion_webhook( + target=webhook_target, + enabled=webhook_auto_emit, + method_name=method_name, + params=request_params, + status="completed", + task_id=task_id, + result=persisted, + ) # ``asyncio.create_task`` only weak-refs the resulting Task — under # GC pressure or with no outer awaiter, the task can be collected diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 5a9818a11..9e861a18b 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -51,6 +51,7 @@ assert_discovery_push_consistent, reject_hand_rolled_submitted, reject_wholesale_handoff, + reject_wholesale_handoff_before_launch, ) from adcp.decisioning.dispatch import ( _build_request_context, @@ -1457,10 +1458,16 @@ def _maybe_auto_emit_sync_completion( ) -> None: """Fire the F12 sync-completion webhook if applicable. - Skips TaskHandoff projections — those go through the registry - completion path which emits its own webhook on terminal state. - The auto-emit fires on the sync-success arm only, mirroring the - JS-side ``routeIfHandoff`` logic at + Skips TaskHandoff projections — on the async (handoff) arm the + terminal completion / failure webhook is delivered from the + background completion path in + :func:`adcp.decisioning.dispatch._project_handoff` + (:func:`adcp.decisioning.webhook_emit.emit_terminal_completion_webhook`), + fired exactly once after ``registry.complete`` / ``registry.fail`` + when the buyer registered ``push_notification_config``. This gate + fires on the sync-success arm only; skipping the submitted + projection here is what keeps the two paths from double-delivering. + Mirrors the JS-side ``routeIfHandoff`` logic at ``src/lib/server/decisioning/runtime/from-platform.ts``. TaskHandoff projection returns the exact 2-key dict ``{"task_id": @@ -1475,8 +1482,10 @@ def _maybe_auto_emit_sync_completion( and set(result.keys()) == {"task_id", "status"} and result.get("status") == "submitted" ): - # TaskHandoff projection — registry completion path emits - # its own webhook on terminal state. + # TaskHandoff projection — the background completion path in + # _project_handoff owns the terminal webhook for this task + # (delivered once when push config is present); skipping here + # prevents a double-delivery. return maybe_emit_sync_completion( sender=self._webhook_sender, @@ -1487,6 +1496,26 @@ def _maybe_auto_emit_sync_completion( result=result, ) + def _handoff_webhook_kwargs(self) -> dict[str, Any]: + """Webhook delivery kwargs threaded into :func:`_invoke_platform_method` + for the async (handoff) completion path. + + The supervisor takes precedence over the bare sender (retry + + circuit breaker), matching the resolution order + :func:`maybe_emit_sync_completion` uses on the sync arm. When a + request hands off AND the buyer registered + ``push_notification_config``, the background completion path + delivers the terminal completion / failure webhook to that + target. The sync arm's auto-emit gate is wired separately via + :meth:`_maybe_auto_emit_sync_completion`; both honor the same + ``auto_emit_completion_webhooks`` flag so an adopter emitting + manually never gets a framework double-delivery on either arm. + """ + return { + "webhook_target": self._webhook_supervisor or self._webhook_sender, + "webhook_auto_emit": self._auto_emit_completion_webhooks, + } + def _build_ctx( self, tool_ctx: ToolContext, @@ -1891,6 +1920,16 @@ async def _persist_draft_hook(get_products_result: Any) -> None: captured_platform, get_products_result, captured_ctx ) + # Guard (b) pre-launch: a wholesale call that hands off is rejected + # the instant dispatch detects the TaskHandoff — before any registry + # row, persist-draft, background coroutine, or completion webhook. + # Belt-and-braces post-dispatch check stays below. + pre_handoff_reject = ( + (lambda: reject_wholesale_handoff_before_launch("buying_mode")) + if mode == "wholesale" + else None + ) + coro = _invoke_platform_method( self._platform, "get_products", @@ -1899,6 +1938,8 @@ async def _persist_draft_hook(get_products_result: Any) -> None: executor=self._executor, registry=self._registry, on_complete=_persist_draft_hook, + pre_handoff_reject=pre_handoff_reject, + **self._handoff_webhook_kwargs(), ) try: result = await ( @@ -1932,20 +1973,32 @@ async def _persist_draft_hook(get_products_result: Any) -> None: # Post-dispatch discovery guards (run on the raw result before any # success-shape projection). ``result`` is either a typed # GetProductsResponse (sync) or the framework's submitted - # projection dict (handoff). All three reject with + # projection dict (handoff). Both reject with # INVALID_REQUEST / correctable. # (d) hand-rolled submitted: adopter built {'status':'submitted'} # by hand instead of ctx.handoff_to_task — no registry row. - # (b) wholesale + handoff: wholesale MUST be synchronous. - # (c) async + unresolved account: the task_id would be unreachable. + # (b) wholesale + handoff: wholesale MUST be synchronous. The + # pre-launch arm (pre_handoff_reject above) already rejected + # it before any task/draft/webhook side effect; this is the + # belt-and-braces post-dispatch check. + # Guard (c) — async + unresolved account — is NOT re-checked here. + # The push arm is rejected pre-dispatch above (correctable + # field='account'); the no-push handoff arm is owned by + # compose_caller_identity, which fails closed inside _build_ctx + # (terminal INVALID_REQUEST) BEFORE the platform method runs, so no + # task row is ever minted against an unresolved account. A + # post-dispatch (c) re-check would be unreachable: reaching this + # line means _build_ctx succeeded, i.e. the account resolved. reject_hand_rolled_submitted(result) reject_wholesale_handoff(result, mode=mode, mode_field="buying_mode") - assert_account_resolved_for_async(result, account_id=account.id, has_push=_has_push) # Handoff path: the submitted envelope is returned verbatim, the # same as create_media_buy. Success-shape projections (property # list, pagination, fields, draft persist) apply only to the sync - # success arm. The registry-completion webhook for the eventual - # terminal artifact fires from _project_handoff, not here. + # success arm. When the buyer registered push_notification_config, + # the terminal completion / failure webhook is delivered from the + # background completion path in _project_handoff exactly once; with + # no push config the buyer polls tasks/get. Either way nothing fires + # here at submit time. if isinstance(result, dict) and result.get("status") == "submitted": return cast("GetProductsResponse", result) response = cast("GetProductsResponse", result) @@ -2097,6 +2150,7 @@ async def _persist_overlay_hook(create_result: Any) -> None: extra_kwargs=extra, on_complete=on_complete, on_failure=on_failure, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("create_media_buy", params, result) return cast("CreateMediaBuyResponse", result) @@ -2153,6 +2207,7 @@ async def _merge_overlay_hook(_update_result: Any) -> None: registry=self._registry, arg_projector={"media_buy_id": params.media_buy_id, "patch": params}, on_complete=on_complete, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("update_media_buy", params, result) return cast("UpdateMediaBuySuccessResponse", result) @@ -2172,6 +2227,7 @@ async def sync_creatives( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("sync_creatives", params, result) return cast("SyncCreativesSuccessResponse", result) @@ -2502,6 +2558,7 @@ async def get_creative_delivery( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("get_creative_delivery", params, result) return cast("GetCreativeDeliveryResponse", result) @@ -2561,6 +2618,14 @@ async def get_signals( # type: ignore[override] mode_str = None else: mode_str = mode.value if hasattr(mode, "value") else str(mode) + # Guard (b) pre-launch: a wholesale call that hands off is rejected + # before any registry row / background coroutine / completion + # webhook. Belt-and-braces post-dispatch check stays below. + pre_handoff_reject = ( + (lambda: reject_wholesale_handoff_before_launch("discovery_mode")) + if mode_str == "wholesale" + else None + ) result = await _invoke_platform_method( self._platform, "get_signals", @@ -2568,13 +2633,28 @@ async def get_signals( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + pre_handoff_reject=pre_handoff_reject, + **self._handoff_webhook_kwargs(), ) # Post-dispatch discovery guards (mirror get_products): - # (d) hand-rolled submitted, (b) wholesale + handoff, - # (c) async + unresolved account. All INVALID_REQUEST / correctable. + # (d) hand-rolled submitted, (b) wholesale + handoff. Both + # INVALID_REQUEST / correctable. Guard (b) here is the + # belt-and-braces check; the pre-launch arm above already + # rejected a wholesale handoff before any side effect. + # Guard (c) — async + unresolved account — is NOT re-checked here: + # the push arm is rejected pre-dispatch above (correctable + # field='account'), and the no-push handoff arm is owned by + # compose_caller_identity, which fails closed terminally inside + # _build_ctx before the platform method runs. Reaching this line + # means the account resolved, so a post-dispatch (c) check is + # unreachable. reject_hand_rolled_submitted(result) reject_wholesale_handoff(result, mode=mode_str, mode_field="discovery_mode") - assert_account_resolved_for_async(result, account_id=account.id, has_push=_has_push) + # On the handoff arm the terminal completion / failure webhook is + # delivered from _project_handoff's background path when the buyer + # registered push_notification_config; the sync auto-emit gate below + # skips the {task_id, status} submitted projection so there is no + # double-delivery. self._maybe_auto_emit_sync_completion("get_signals", params, result) return cast("GetSignalsResponse", result) @@ -2595,6 +2675,7 @@ async def activate_signal( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("activate_signal", params, result) return cast("ActivateSignalSuccessResponse", result) @@ -2630,6 +2711,7 @@ async def sync_audiences( # type: ignore[override] executor=self._executor, registry=self._registry, arg_projector={"audiences": getattr(params, "audiences", []) or []}, + **self._handoff_webhook_kwargs(), ) projected = _project_sync_audiences(result) self._maybe_auto_emit_sync_completion("sync_audiences", params, projected) @@ -2666,6 +2748,7 @@ async def sync_catalogs( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) projected = _project_sync_catalogs(result) self._maybe_auto_emit_sync_completion("sync_catalogs", params, projected) @@ -2804,6 +2887,7 @@ async def get_brand_identity( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("get_brand_identity", params, result) return cast("GetBrandIdentitySuccessResponse", result) @@ -2829,6 +2913,7 @@ async def get_rights( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("get_rights", params, result) return cast("GetRightsSuccessResponse", result) @@ -2858,6 +2943,7 @@ async def acquire_rights( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("acquire_rights", params, result) return cast("AcquireRightsResponse", result) @@ -3148,6 +3234,7 @@ async def create_property_list( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("create_property_list", params, result) return cast("CreatePropertyListResponse", result) @@ -3167,6 +3254,7 @@ async def update_property_list( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("update_property_list", params, result) return cast("UpdatePropertyListResponse", result) @@ -3186,6 +3274,7 @@ async def get_property_list( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("get_property_list", params, result) return cast("GetPropertyListResponse", result) @@ -3205,6 +3294,7 @@ async def list_property_lists( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("list_property_lists", params, result) return cast("ListPropertyListsResponse", result) @@ -3227,6 +3317,7 @@ async def delete_property_list( # type: ignore[override] ctx, executor=self._executor, registry=self._registry, + **self._handoff_webhook_kwargs(), ) self._maybe_auto_emit_sync_completion("delete_property_list", params, result) return cast("DeletePropertyListResponse", result) diff --git a/src/adcp/decisioning/serve.py b/src/adcp/decisioning/serve.py index 283c1b165..577275717 100644 --- a/src/adcp/decisioning/serve.py +++ b/src/adcp/decisioning/serve.py @@ -138,8 +138,11 @@ def create_adcp_server_from_platform( v6.0 stub (raises ``NotImplementedError`` with a pointer to v6.1). :param webhook_sender: Bring-your-own - :class:`adcp.webhook_sender.WebhookSender` for sync-completion - and HITL-completion webhook delivery. Default ``None``. The + :class:`adcp.webhook_sender.WebhookSender` for completion webhook + delivery — both the sync-success auto-emit and the terminal + completion / failure notification on the async (handoff) path of + any spec-eligible verb when the buyer registered + ``push_notification_config``. Default ``None``. The sender is the *transport* — one HTTP-Signatures POST per call, no retry, no breaker. Production sellers typically wrap the sender in a :class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` @@ -488,9 +491,11 @@ def serve( :param state_reader: Custom :class:`StateReader` impl (D15). :param resource_resolver: Custom :class:`ResourceResolver` impl (D15). :param webhook_sender: BYO :class:`adcp.webhook_sender.WebhookSender` - for completion webhook delivery (sync auto-emit + HITL terminal). - Transport only — one attempt, no retry. ``None`` disables - auto-emit silently. + for completion webhook delivery — sync-success auto-emit plus the + terminal completion / failure notification on the async (handoff) + path of any spec-eligible verb when the buyer registered + ``push_notification_config``. Transport only — one attempt, no + retry. ``None`` disables emission silently. :param webhook_supervisor: BYO :class:`~adcp.webhook_supervisor.WebhookDeliverySupervisor` for reliable delivery (retry, circuit breaker, attempt audit). diff --git a/src/adcp/decisioning/specialisms/sales.py b/src/adcp/decisioning/specialisms/sales.py index c0114d0d7..f985447d9 100644 --- a/src/adcp/decisioning/specialisms/sales.py +++ b/src/adcp/decisioning/specialisms/sales.py @@ -120,9 +120,13 @@ def get_products( ``ctx.handoff_to_task(fn)`` when composing the catalog needs background work (custom curation queue, slow proposal generation); the framework projects the handoff to the wire - ``submitted`` envelope and the buyer polls ``tasks/get`` for the - terminal :class:`GetProductsResponse`. ``get_products`` exposes - the ``submitted`` / ``working`` / ``input_required`` async arms. + ``submitted`` envelope. The buyer reaches the terminal + :class:`GetProductsResponse` via ``tasks/get`` polling, and — when + the request carried ``push_notification_config`` — the framework + also delivers the terminal completion / failure webhook to the + buyer's URL from the background completion path (exactly once). + ``get_products`` exposes the ``submitted`` / ``working`` / + ``input_required`` async arms. Wholesale (``buying_mode='wholesale'``) MUST return synchronously — it is a raw rate-card read with no seller-side composition to diff --git a/src/adcp/decisioning/specialisms/signals.py b/src/adcp/decisioning/specialisms/signals.py index 077393fb6..629fad691 100644 --- a/src/adcp/decisioning/specialisms/signals.py +++ b/src/adcp/decisioning/specialisms/signals.py @@ -82,8 +82,11 @@ def get_signals( ``ctx.handoff_to_task(fn)`` when provider discovery needs background work (cross-provider fan-out, identity-graph lookups); the framework projects the handoff to the wire ``submitted`` - envelope and the buyer polls ``tasks/get`` for the terminal - :class:`GetSignalsResponse`. + envelope. The buyer reaches the terminal + :class:`GetSignalsResponse` via ``tasks/get`` polling, and — when + the request carried ``push_notification_config`` — the framework + also delivers the terminal completion / failure webhook to the + buyer's URL from the background completion path (exactly once). Wholesale (``discovery_mode='wholesale'``) MUST return synchronously — a raw catalog read with no seller-side diff --git a/src/adcp/decisioning/webhook_emit.py b/src/adcp/decisioning/webhook_emit.py index 6704d2cb2..cc59109d2 100644 --- a/src/adcp/decisioning/webhook_emit.py +++ b/src/adcp/decisioning/webhook_emit.py @@ -134,6 +134,28 @@ def _extract_push_notification_url_and_token( return (str(url), token) +def _extract_push_operation_id(params: Any) -> str | None: + """Pull the buyer-supplied ``operation_id`` off + ``params.push_notification_config``. + + Per ``schemas/cache/core/push_notification_config.json`` the buyer + registers ``operation_id`` on the push config and the seller MUST + echo it verbatim into every webhook payload's ``operation_id`` + field. The seller MUST NOT recover it from the URL — the wire-level + source of truth is this field. Tolerates Pydantic models and plain + dicts; returns ``None`` when absent. + """ + config = getattr(params, "push_notification_config", None) + if config is None and isinstance(params, dict): + config = params.get("push_notification_config") + if config is None: + return None + operation_id = getattr(config, "operation_id", None) + if operation_id is None and isinstance(config, dict): + operation_id = config.get("operation_id") + return operation_id + + async def _emit_sync_completion_webhook( *, target: DeliveryTarget, @@ -328,6 +350,146 @@ def maybe_emit_sync_completion( ) +async def emit_terminal_completion_webhook( + *, + target: DeliveryTarget | None, + enabled: bool, + method_name: str, + params: Any, + status: str, + task_id: str, + result: Any = None, +) -> None: + """Deliver the terminal completion / failure webhook for an async task. + + Fired from the BACKGROUND completion path of + :func:`adcp.decisioning.dispatch._project_handoff` — once, after the + registry has recorded the terminal state. This is the async-path + counterpart to :func:`maybe_emit_sync_completion`: when a seller + returns a ``Submitted`` envelope (the request handed off to a task) + AND the buyer supplied ``push_notification_config``, the spec + (AdCP, adcp#5389) requires the seller to deliver at least the + terminal completion / failure notification to that webhook. Buyers + who registered a push config get notified without polling + ``tasks/get``. + + Unlike the sync gate, this coroutine is already running inside the + background task — there is no inline buyer response to protect, so + the delivery is awaited directly rather than scheduled fire-and- + forget. The whole body is wrapped in ``try/except Exception`` and + logged-and-swallowed: a webhook delivery failure must never crash + the background task or block the registry's terminal-state record + (which the buyer can still read via ``tasks/get``). + + Skips silently when: + + * ``enabled`` is False (operator opted out via + ``auto_emit_completion_webhooks=False`` — they emit manually). + * The request didn't carry ``push_notification_config.url`` + (polling-only via ``tasks/get`` — the spec permits this). + + Logs a WARNING when: + + * ``target`` is None but the buyer DID register a push config — + their terminal notification is being silently dropped, the same + misconfig the sync gate warns on. + * ``method_name`` isn't in :data:`SPEC_WEBHOOK_TASK_TYPES` (the + adopter extended the tool surface beyond the spec enum). + + :param status: ``'completed'`` on success or ``'failed'`` on a + terminal failure. The wire ``GeneratedTaskStatus`` enum. + :param result: On success, the projected terminal artifact (the + same shape persisted to the registry). On failure, the + structured error wire dict (``error.to_wire()``) so the buyer + sees the failure inline. ``operation_id`` is echoed verbatim + from ``push_notification_config.operation_id`` and ``task_id`` + is the registry-minted id. + """ + try: + if not enabled: + return + + config = getattr(params, "push_notification_config", None) + if config is None and isinstance(params, dict): + config = params.get("push_notification_config") + if config is None: + return # buyer didn't register — polling-only, nothing to do + + if target is None: + # Buyer registered a push config but no sender / supervisor is + # wired. Without this branch the terminal notification quietly + # disappears — surfacing a warning gives the adopter a fast + # path to the misconfig (mirrors the sync gate). + try: + url_for_log = getattr(config, "url", None) + if url_for_log is None and isinstance(config, dict): + url_for_log = config.get("url") + except Exception: + url_for_log = None + logger.warning( + "[adcp.decisioning] buyer registered push_notification_config " + "(url=%s) for async %s (task_id=%s) but neither webhook_sender " + "nor webhook_supervisor is wired — terminal %s webhook silently " + "dropped. Pass one to " + "adcp.decisioning.serve.create_adcp_server_from_platform, or set " + "auto_emit_completion_webhooks=False to silence this warning.", + url_for_log if url_for_log else "", + method_name, + task_id, + status, + ) + return + + extracted = _extract_push_notification_url_and_token(params) + if extracted is None: + return + url, token = extracted + operation_id = _extract_push_operation_id(params) + + # Defense-in-depth: strip credentials from the artifact BEFORE the + # webhook target sees it. The dispatcher already strips before + # persisting to the registry (:func:`_project_handoff`); this is a + # second pass at the delivery boundary. Method-gated — non-account + # tools short-circuit without walking the result. Failure payloads + # (error wire dicts) never carry credentials but pass through the + # same gate harmlessly. + if result is not None: + result = strip_credentials_from_wire_result(method_name, result) + + if method_name not in SPEC_WEBHOOK_TASK_TYPES: + logger.warning( + "[adcp.decisioning] terminal %s webhook for async %s " + "(task_id=%s) skipped — tool not in spec task-type enum " + "(closed set per schemas/cache/enums/task-type.json).", + status, + method_name, + task_id, + ) + return + + await target.send_mcp( + url=url, + task_id=task_id, + status=status, + task_type=method_name, + result=result, + operation_id=operation_id, + token=token, + ) + except Exception: + # Logged-and-swallowed: the background task's terminal state is + # already recorded in the registry; the buyer can read it via + # tasks/get regardless of webhook delivery outcome. + logger.warning( + "[adcp.decisioning] terminal %s webhook for async %s " + "(task_id=%s) failed; registry terminal state already recorded", + status, + method_name, + task_id, + exc_info=True, + ) + + def validate_webhook_sender_for_platform( *, advertised_tools: frozenset[str] | set[str], @@ -567,6 +729,7 @@ def validate_webhook_signing_for_capabilities( __all__ = [ "SPEC_WEBHOOK_TASK_TYPES", + "emit_terminal_completion_webhook", "maybe_emit_sync_completion", "validate_webhook_sender_for_platform", "validate_webhook_signing_for_capabilities", diff --git a/src/adcp/webhook_supervisor.py b/src/adcp/webhook_supervisor.py index c54eaa515..a1335f2f3 100644 --- a/src/adcp/webhook_supervisor.py +++ b/src/adcp/webhook_supervisor.py @@ -216,6 +216,7 @@ async def send_mcp( status: GeneratedTaskStatus | str, task_type: TaskType | str, result: Any = None, + operation_id: str | None = None, token: str | None = None, sequence_key: str | None = None, breaker_key: str | None = None, @@ -403,6 +404,7 @@ async def send_mcp( status: GeneratedTaskStatus | str, task_type: TaskType | str, result: Any = None, + operation_id: str | None = None, token: str | None = None, sequence_key: str | None = None, breaker_key: str | None = None, @@ -425,6 +427,12 @@ async def send_mcp( via :meth:`next_sequence`. Recommend ``f"{media_buy_id}:{url}"`` (per-receiver stream); see :meth:`next_sequence` for the rationale. + :param operation_id: Buyer-supplied correlation id from + ``push_notification_config.operation_id``; echoed verbatim + into the webhook payload's ``operation_id`` field per spec. + Threaded through to the underlying + :meth:`WebhookSender.send_mcp`. On retry (``resend``) the id + is preserved by replaying the exact attempt-1 bytes. :param notification_type: Passthrough to ``DeliveryAttempt`` for delivery-report webhooks (``scheduled`` / ``final`` / ``adjusted`` / ``delayed`` / ``window_update``). F12 @@ -509,6 +517,7 @@ async def send_mcp( status=status, task_type=task_type, result=result, + operation_id=operation_id, token=token, ) response_time_ms = int((time.monotonic() - attempt_started_monotonic) * 1000) diff --git a/src/adcp/webhook_supervisor_pg.py b/src/adcp/webhook_supervisor_pg.py index a792907a8..90e3e2ac6 100644 --- a/src/adcp/webhook_supervisor_pg.py +++ b/src/adcp/webhook_supervisor_pg.py @@ -298,13 +298,13 @@ def __init__( self._sql_enqueue = ( f"INSERT INTO {qt} " # noqa: S608 f"(breaker_key, url, task_id, task_type, status_str, result_json, " - f"token, sequence_key, max_attempts, notification_type) " - f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id" + f"token, sequence_key, max_attempts, notification_type, operation_id) " + f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id" ) self._sql_poll = ( f"SELECT id, breaker_key, url, task_id, task_type, status_str, " # noqa: S608 f"result_json, token, sequence_key, attempt_count, max_attempts, " - f"idempotency_key, sent_body, notification_type " + f"idempotency_key, sent_body, notification_type, operation_id " f"FROM {qt} " f"WHERE status_str IN ('pending', 'retry') AND scheduled_at <= now() " f"ORDER BY scheduled_at LIMIT 1 FOR UPDATE SKIP LOCKED" @@ -371,8 +371,15 @@ async def create_schema(self) -> None: created_at TIMESTAMPTZ NOT NULL DEFAULT now(), idempotency_key TEXT, sent_body BYTEA, - notification_type TEXT + notification_type TEXT, + operation_id TEXT )""", + # Backfill the operation_id column on tables created before the + # async-completion-webhook operation_id echo landed. ADD COLUMN + # IF NOT EXISTS is a no-op on fresh tables (the column is already + # in the CREATE above) and an in-place add on pre-existing ones — + # so a CREATE-only deployment and an upgrading one converge. + f"ALTER TABLE {qt} ADD COLUMN IF NOT EXISTS operation_id TEXT", # Partial index on work-eligible rows; avoids scanning completed/in-flight rows. f"""CREATE INDEX IF NOT EXISTS {qt}_work_idx ON {qt} (status_str, scheduled_at) @@ -415,6 +422,7 @@ async def send_mcp( status: GeneratedTaskStatus | str, task_type: TaskType | str, result: Any = None, + operation_id: str | None = None, token: str | None = None, sequence_key: str | None = None, breaker_key: str | None = None, @@ -434,6 +442,10 @@ async def send_mcp( :param breaker_key: Override the circuit-breaker lookup key (default: ``url``). Multi-tenant sellers whose buyers share a SaaS receiver URL MUST pass a tenant-scoped key (e.g. ``f"{tenant_id}:{url}"``). + :param operation_id: Buyer-supplied correlation id echoed verbatim + into the webhook payload per spec. Persisted on the queue row and + replayed to :meth:`WebhookSender.send_mcp` when the worker + delivers the job. :param notification_type: Passed through to the delivery log for delivery-report webhooks (``scheduled`` / ``final`` / etc.). """ @@ -514,6 +526,7 @@ async def send_mcp( sequence_key, self._retry.max_attempts, notification_type, + operation_id, ), ) enqueue_row = await cur.fetchone() @@ -592,6 +605,7 @@ async def _poll_and_process(self) -> bool: idempotency_key, sent_body, notification_type, + operation_id, ) = row attempt_number = attempt_count + 1 @@ -627,6 +641,7 @@ async def _poll_and_process(self) -> bool: status=status_str, task_type=task_type, result=result_obj, + operation_id=operation_id, token=token, ) except Exception as exc: diff --git a/tests/test_decisioning_async_discovery.py b/tests/test_decisioning_async_discovery.py index 57e23e985..bc2320475 100644 --- a/tests/test_decisioning_async_discovery.py +++ b/tests/test_decisioning_async_discovery.py @@ -310,7 +310,10 @@ async def get_signals(self, req, ctx): class _RecordingStore: - """Minimal ProposalStore that records put_draft calls.""" + """Minimal ProposalStore that records put_draft calls (including the + full persisted proposal_payload + recipes, so a regression test can + pin that pagination / fields projection on the wire response never + strips data from the stored draft).""" def __init__(self) -> None: self.put_calls: list[dict[str, Any]] = [] @@ -323,7 +326,14 @@ async def put_draft( recipes: Any, proposal_payload: Any, ) -> None: - self.put_calls.append({"proposal_id": proposal_id, "account_id": account_id}) + self.put_calls.append( + { + "proposal_id": proposal_id, + "account_id": account_id, + "recipes": recipes, + "proposal_payload": proposal_payload, + } + ) class _StoreBackedAccounts: @@ -661,11 +671,444 @@ def test_task_type_enum_includes_discovery_verbs() -> None: def test_wire_validator_accepts_submitted_for_discovery_verbs() -> None: """Strict response validation selects the submitted variant and passes - the {task_id, status='submitted'} envelope for both discovery verbs.""" + the {task_id, status='submitted'} envelope for both discovery verbs. + + The rc.9 bundle ships the submitted schema for both verbs, so the + validator MUST actually select the ``submitted`` variant (not fall back + to the ``skipped`` no-bundle path) and report it valid. Asserting on the + variant closes the escape hatch where an absent bundle would silently + pass the looser ``outcome.valid`` check. + """ from adcp.validation.schema_validator import validate_response for tool in ("get_products", "get_signals"): outcome = validate_response(tool, {"task_id": "task_x", "status": "submitted"}) - # Either the submitted variant validates clean, or the bundle is - # absent (skipped). It must NOT report a hard schema failure. - assert outcome.valid, f"{tool} submitted envelope failed validation: {outcome.issues}" + assert ( + outcome.valid is True + ), f"{tool} submitted envelope failed validation: {outcome.issues}" + assert outcome.variant == "submitted", ( + f"{tool} validated against the {outcome.variant!r} variant, not the " + "rc.9 submitted schema — the bundle should be present, not skipped" + ) + + +# --------------------------------------------------------------------------- +# SHOULD-FIX 2: persisted draft retains full product recipe / pricing data +# even when fields= / pagination= shape the wire response +# --------------------------------------------------------------------------- + +# A wire-valid Product carrying full pricing — the data the persisted draft +# must retain regardless of the buyer-presentation projection applied to the +# response. +_RICH_PRODUCT: dict[str, Any] = { + "product_id": "p1", + "name": "Premium CTV", + "description": "Premium connected TV inventory", + "publisher_properties": [{"selection_type": "all", "publisher_domain": "pub.example.com"}], + "format_ids": [], + "delivery_type": "non_guaranteed", + "pricing_options": [ + {"pricing_model": "cpm", "pricing_option_id": "po1", "currency": "USD", "rate": 42.5} + ], + "reporting_capabilities": { + "available_reporting_frequencies": ["daily"], + "expected_delay_minutes": 60, + "timezone": "UTC", + "supports_webhooks": False, + "available_metrics": ["impressions"], + "date_range_support": "date_range", + }, +} + + +@pytest.mark.asyncio +async def test_persisted_draft_keeps_full_product_data_under_projection(executor) -> None: + """A sync get_products with proposals + a recording store, where the + request applies fields= AND pagination= projections: the persisted draft + must retain the FULL product recipe / pricing data. The wire-response + projections shape only what the buyer sees, never the stored draft (the + persist-draft on_complete hook fires with the raw adapter result before + any projection). Pins the intended raw-result persistence behavior.""" + from adcp.types import GetProductsRequest, GetProductsResponse, Product + + rich_products = [Product.model_validate(_RICH_PRODUCT)] + proposals = [ + { + "proposal_id": "prop_rich", + "name": "plan", + "allocations": [{"product_id": "p1", "allocation_percentage": 100}], + } + ] + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities(auto_paginate=True) + accounts = _StoreBackedAccounts() + + def proposal_store_for_tenant(self, tenant_id: str): + return store + + async def get_products(self, req, ctx): + return GetProductsResponse(products=rich_products, proposals=proposals) + + store = _RecordingStore() + handler = _make_handler(_Platform(), executor) + # fields= drops product fields from the wire response; pagination= + # paginates it. Neither must touch the stored draft. + resp = await handler.get_products( + GetProductsRequest( + buying_mode="brief", + brief="x", + fields=["product_id"], + pagination={"max_results": 1}, + ), + ToolContext(), + ) + assert isinstance(resp, GetProductsResponse) + # Exactly one draft persisted. + assert [c["proposal_id"] for c in store.put_calls] == ["prop_rich"] + persisted = store.put_calls[0]["proposal_payload"] + # The persisted proposal retains its allocations (recipe linkage). + assert persisted["allocations"][0]["product_id"] == "p1" + # The enriched pricing_option_id is carried onto the allocation (the + # single-option product makes the choice unambiguous) — proves the + # persist hook saw the full product pricing, not a stripped wire shape. + assert persisted["allocations"][0].get("pricing_option_id") == "po1" + + +# --------------------------------------------------------------------------- +# MUST-FIX 2: no-push handoff against an unresolved account is owned by +# compose_caller_identity (fails closed at _build_ctx, before any task row) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_products_handoff_unresolved_account_no_push_rejected(executor) -> None: + """A no-push handoff against an unresolved (sentinel) account is rejected + BEFORE any task row is minted. compose_caller_identity fails closed + inside _build_ctx (terminal INVALID_REQUEST) before the platform method + runs, so the registry never issues a task the buyer could not reach.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + async def _curate(task_ctx): + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = _UnresolvedAccounts() + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + registry = InMemoryTaskRegistry() + handler = _make_handler(_Platform(), executor, registry=registry) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + # NO push_notification_config — the handoff arm of guard (c). + GetProductsRequest(buying_mode="brief", brief="x"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + # No task row was ever issued — the rejection happened before dispatch. + assert registry._records == {} + + +# --------------------------------------------------------------------------- +# SHOULD-FIX 1: a rejected wholesale handoff leaves NO registry side effects +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_wholesale_handoff_rejected_leaves_no_registry_row(executor) -> None: + """An adopter that hands off on a wholesale get_products is rejected the + instant dispatch detects the TaskHandoff — BEFORE a registry row is + minted, the background coroutine launched, or a draft persisted. Asserts + the registry has no row after the rejection.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + ran = {"bg": False} + + async def _curate(task_ctx): + ran["bg"] = True + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + registry = InMemoryTaskRegistry() + handler = _make_handler(_Platform(), executor, registry=registry) + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest(buying_mode="wholesale"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.field == "buying_mode" + # Give any (erroneously launched) bg task a chance to run. + await asyncio.sleep(0.05) + # No registry row minted, and the handoff coroutine never ran. + assert registry._records == {} + assert ran["bg"] is False + + +@pytest.mark.asyncio +async def test_get_signals_wholesale_handoff_rejected_leaves_no_registry_row(executor) -> None: + from adcp.types import GetSignalsRequest, GetSignalsResponse + + ran = {"bg": False} + + async def _discover(task_ctx): + ran["bg"] = True + return GetSignalsResponse(signals=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return ctx.handoff_to_task(_discover) + + registry = InMemoryTaskRegistry() + handler = _make_handler(_Platform(), executor, registry=registry) + with pytest.raises(AdcpError) as exc: + await handler.get_signals( + GetSignalsRequest(discovery_mode="wholesale", signal_spec="x"), + ToolContext(), + ) + assert exc.value.code == "INVALID_REQUEST" + assert exc.value.field == "discovery_mode" + await asyncio.sleep(0.05) + assert registry._records == {} + assert ran["bg"] is False + + +# --------------------------------------------------------------------------- +# Async-completion webhooks: terminal completion / failure delivered exactly +# once from the background path when push_notification_config is present +# --------------------------------------------------------------------------- + + +def _push_handler(platform: DecisioningPlatform, executor: ThreadPoolExecutor): + """Handler wired with a recording AsyncMock webhook sender.""" + from unittest.mock import AsyncMock + + sender = AsyncMock() + handler = PlatformHandler( + platform, + executor=executor, + registry=InMemoryTaskRegistry(), + webhook_sender=sender, + ) + return handler, sender + + +async def _drain_until_webhook(sender, *, attempts: int = 40) -> None: + for _ in range(attempts): + await asyncio.sleep(0.02) + if sender.send_mcp.await_count: + return + + +@pytest.mark.asyncio +async def test_get_products_handoff_push_emits_one_completed_webhook(executor) -> None: + """get_products handoff + push_notification_config → exactly one + 'completed' webhook from the background path, with operation_id echoed, + the registry task_id, and the terminal result in the payload.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + async def _curate(task_ctx): + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + handler, sender = _push_handler(_Platform(), executor) + envelope = await handler.get_products( + GetProductsRequest( + buying_mode="brief", + brief="ctv", + push_notification_config={ + "url": "https://buyer.example.com/wh", + "operation_id": "op-products-123", + "token": "tok-products-abcdefghij", + }, + ), + ToolContext(), + ) + assert envelope["status"] == "submitted" + await _drain_until_webhook(sender) + sender.send_mcp.assert_awaited_once() + kw = sender.send_mcp.await_args.kwargs + assert kw["status"] == "completed" + assert kw["task_type"] == "get_products" + assert kw["task_id"] == envelope["task_id"] + assert kw["operation_id"] == "op-products-123" + assert kw["token"] == "tok-products-abcdefghij" + assert "products" in kw["result"] + + +@pytest.mark.asyncio +async def test_get_signals_handoff_push_emits_one_completed_webhook(executor) -> None: + from adcp.types import GetSignalsRequest, GetSignalsResponse + + async def _discover(task_ctx): + return GetSignalsResponse(signals=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="signals-seller") + + async def get_signals(self, req, ctx): + return ctx.handoff_to_task(_discover) + + handler, sender = _push_handler(_Platform(), executor) + envelope = await handler.get_signals( + GetSignalsRequest( + discovery_mode="brief", + signal_spec="auto intenders", + push_notification_config={ + "url": "https://buyer.example.com/wh", + "operation_id": "op-signals-456", + }, + ), + ToolContext(), + ) + assert envelope["status"] == "submitted" + await _drain_until_webhook(sender) + sender.send_mcp.assert_awaited_once() + kw = sender.send_mcp.await_args.kwargs + assert kw["status"] == "completed" + assert kw["task_type"] == "get_signals" + assert kw["task_id"] == envelope["task_id"] + assert kw["operation_id"] == "op-signals-456" + + +@pytest.mark.asyncio +async def test_create_media_buy_handoff_push_emits_one_completed_webhook(executor) -> None: + """create_media_buy handoff + push → exactly one 'completed' terminal + webhook from the background path (the framework-wide async-completion + webhook now also covers create_media_buy).""" + from adcp.types import CreateMediaBuyRequest, CreateMediaBuySuccessResponse + + async def _review(task_ctx): + return CreateMediaBuySuccessResponse( + media_buy_id="mb_async_1", + confirmed_at="2026-05-01T00:00:00Z", + revision=1, + packages=[], + status="active", + ) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities(specialisms=["sales-non-guaranteed"]) + accounts = SingletonAccounts(account_id="seller") + + def create_media_buy(self, req, ctx): + return ctx.handoff_to_task(_review) + + handler, sender = _push_handler(_Platform(), executor) + envelope = await handler.create_media_buy( + CreateMediaBuyRequest( + account={"account_id": "acct_a"}, + brand={"domain": "example.com"}, + idempotency_key="idem_async_create_1", + start_time="2026-05-01T00:00:00Z", + end_time="2026-05-31T23:59:59Z", + push_notification_config={ + "url": "https://buyer.example.com/wh", + "operation_id": "op-cmb-789", + }, + ), + ToolContext(), + ) + assert isinstance(envelope, dict) and envelope["status"] == "submitted" + await _drain_until_webhook(sender) + sender.send_mcp.assert_awaited_once() + kw = sender.send_mcp.await_args.kwargs + assert kw["status"] == "completed" + assert kw["task_type"] == "create_media_buy" + assert kw["task_id"] == envelope["task_id"] + assert kw["operation_id"] == "op-cmb-789" + assert kw["result"]["media_buy_id"] == "mb_async_1" + + +@pytest.mark.asyncio +async def test_get_products_handoff_failure_emits_one_failed_webhook(executor) -> None: + """When the handoff fn raises, the background path delivers exactly one + 'failed' webhook carrying the structured error, with operation_id echoed + and the registry task_id.""" + from adcp.types import GetProductsRequest + + async def _curate(task_ctx): + raise AdcpError( + "INTERNAL_ERROR", + message="curation backend unavailable", + recovery="transient", + ) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + handler, sender = _push_handler(_Platform(), executor) + envelope = await handler.get_products( + GetProductsRequest( + buying_mode="brief", + brief="ctv", + push_notification_config={ + "url": "https://buyer.example.com/wh", + "operation_id": "op-fail-111", + }, + ), + ToolContext(), + ) + assert envelope["status"] == "submitted" + await _drain_until_webhook(sender) + sender.send_mcp.assert_awaited_once() + kw = sender.send_mcp.await_args.kwargs + assert kw["status"] == "failed" + assert kw["task_type"] == "get_products" + assert kw["task_id"] == envelope["task_id"] + assert kw["operation_id"] == "op-fail-111" + # The structured error wire dict rides on the payload result. + assert kw["result"] is not None + + +@pytest.mark.asyncio +async def test_get_products_handoff_no_push_no_webhook(executor) -> None: + """No push_notification_config → NO webhook fires on the handoff + completion path. The buyer polls tasks/get; the polling-only path is + unchanged.""" + from adcp.types import GetProductsRequest, GetProductsResponse + + completed = asyncio.Event() + + async def _curate(task_ctx): + completed.set() + return GetProductsResponse(products=[]) + + class _Platform(DecisioningPlatform): + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="seller") + + async def get_products(self, req, ctx): + return ctx.handoff_to_task(_curate) + + handler, sender = _push_handler(_Platform(), executor) + await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="ctv"), + ToolContext(), + ) + await asyncio.wait_for(completed.wait(), timeout=2.0) + await asyncio.sleep(0.05) + sender.send_mcp.assert_not_called() diff --git a/tests/test_decisioning_webhook_emit.py b/tests/test_decisioning_webhook_emit.py index 85f2692df..ae88f0a9b 100644 --- a/tests/test_decisioning_webhook_emit.py +++ b/tests/test_decisioning_webhook_emit.py @@ -478,11 +478,13 @@ async def test_handler_fires_auto_emit_on_sync_success(executor) -> None: @pytest.mark.asyncio -async def test_handler_does_not_double_fire_on_handoff_path(executor) -> None: - """TaskHandoff projection returns the Submitted envelope; the - registry completion path emits its own webhook on terminal state. - The auto-emit MUST NOT fire on this arm — buyer would receive - duplicate webhooks.""" +async def test_handler_fires_exactly_one_completion_webhook_on_handoff_path(executor) -> None: + """A create_media_buy that hands off (Submitted envelope) AND carries + push_notification_config delivers EXACTLY ONE terminal completion + webhook from the background completion path (spec MUST, adcp#5389) — + not zero (the buyer would be left polling), and not two (the sync + auto-emit must skip the submitted projection so it never double-fires + against the background path).""" sender = AsyncMock() handler = PlatformHandler( _HandoffPlatform(), @@ -492,15 +494,28 @@ async def test_handler_does_not_double_fire_on_handoff_path(executor) -> None: auto_emit_completion_webhooks=True, ) result = await handler.create_media_buy(_make_request(with_url=True), ToolContext()) - # Drain any background tasks (handoff fn runs in background). - for _ in range(20): - await asyncio.sleep(0.05) - # The auto-emit must NOT have fired — handoff path is responsible - # for its own webhook. - sender.send_mcp.assert_not_called() - # Sanity check: result is the Submitted envelope. + # At submit time, no webhook yet — the bg task hasn't completed. assert isinstance(result, dict) assert result["status"] == "submitted" + submitted_task_id = result["task_id"] + # Drain the background completion task. + for _ in range(40): + await asyncio.sleep(0.02) + if sender.send_mcp.await_count: + break + # Exactly once — from the background completion path, not the sync gate. + sender.send_mcp.assert_awaited_once() + call_kwargs = sender.send_mcp.await_args.kwargs + assert call_kwargs["status"] == "completed" + assert call_kwargs["task_type"] == "create_media_buy" + # The terminal webhook carries the registry-minted task_id (the same + # one the Submitted envelope announced), NOT a synthetic sync-* id. + assert call_kwargs["task_id"] == submitted_task_id + assert not call_kwargs["task_id"].startswith("sync-") + # The terminal artifact rides on the payload result. + assert call_kwargs["result"]["media_buy_id"] == "mb_after_review" + # Buyer-supplied token echoed verbatim. + assert call_kwargs["token"] == "echo-back-xxxxxxxxxxxxx" @pytest.mark.asyncio diff --git a/tests/test_webhook_supervisor.py b/tests/test_webhook_supervisor.py index 4ab560f85..5d647f2cc 100644 --- a/tests/test_webhook_supervisor.py +++ b/tests/test_webhook_supervisor.py @@ -233,6 +233,25 @@ async def test_supervisor_success_first_attempt_records_one_log() -> None: assert sink.calls[0].will_retry is False +@pytest.mark.asyncio +async def test_supervisor_threads_operation_id_to_sender() -> None: + """The buyer-supplied operation_id is forwarded verbatim to the + underlying WebhookSender.send_mcp so it is echoed into the payload.""" + sender = MagicMock() + sender.send_mcp = AsyncMock(return_value=_ok()) + sup = _supervisor(sender) + + await sup.send_mcp( + url="https://buyer.example.com/wh", + task_id="t1", + status="completed", + task_type="create_media_buy", + result={"media_buy_id": "mb_1"}, + operation_id="op-supervisor-123", + ) + assert sender.send_mcp.await_args.kwargs["operation_id"] == "op-supervisor-123" + + # ----- Supervisor: retry path ----- diff --git a/tests/test_webhook_supervisor_pg.py b/tests/test_webhook_supervisor_pg.py index 0d69916ee..9cab6e7d7 100644 --- a/tests/test_webhook_supervisor_pg.py +++ b/tests/test_webhook_supervisor_pg.py @@ -129,7 +129,8 @@ def _make_supervisor(pool: Any, sender: Any, **kwargs: Any) -> Any: None, 0, 3, - # idempotency_key, sent_body, notification_type + # idempotency_key, sent_body, notification_type, operation_id + None, None, None, None, @@ -153,6 +154,7 @@ def _queue_row(**overrides: Any) -> tuple: "idempotency_key", "sent_body", "notification_type", + "operation_id", ] for k, v in overrides.items(): row[_fields.index(k)] = v @@ -222,8 +224,9 @@ async def test_executes_all_ddl_statements_separately(self) -> None: sup = _make_supervisor(pool, _make_sender()) await sup.create_schema() - # 6 statements: 3 tables + 3 indexes (1 partial + 2 standard) - assert conn.execute.call_count == 6 + # 7 statements: 3 tables + 1 ALTER (operation_id backfill on the + # queue table) + 3 indexes (1 partial + 2 standard) + assert conn.execute.call_count == 7 @pytest.mark.asyncio async def test_each_statement_contains_table_name(self) -> None: @@ -424,6 +427,45 @@ async def test_success_calls_sender_send_mcp(self) -> None: sender.send_mcp.assert_awaited_once() sender.resend.assert_not_awaited() + @pytest.mark.asyncio + async def test_operation_id_persisted_on_enqueue(self) -> None: + """The buyer-supplied operation_id is bound as the last enqueue + parameter so it lands on the queue row for the worker to replay.""" + conn_circuit = _make_conn(None) + conn_enqueue = _make_conn((7,)) + pool = _make_pool(conn_circuit, conn_enqueue) + sup = _make_supervisor(pool, _make_sender()) + sup._worker_started = True + + await sup.send_mcp( + url="https://b.example/wh", + task_id="t1", + status="completed", + task_type="get_products", + operation_id="op-xyz-789", + ) + enqueue_params = conn_enqueue.execute.call_args.args[1] + assert enqueue_params[-1] == "op-xyz-789" + + @pytest.mark.asyncio + async def test_worker_replays_operation_id_to_sender(self) -> None: + """The worker reads operation_id off the queue row and echoes it + into the underlying WebhookSender.send_mcp call.""" + sender = _make_sender(_ok()) + conn = _make_conn( + _queue_row(operation_id="op-replay-123"), + None, + ("closed", 0), + None, + ) + pool = _make_pool(conn) + sup = _make_supervisor(pool, sender) + + await sup._poll_and_process() + + sender.send_mcp.assert_awaited_once() + assert sender.send_mcp.await_args.kwargs["operation_id"] == "op-replay-123" + @pytest.mark.asyncio async def test_empty_queue_returns_false(self) -> None: conn = _make_conn(None) # poll returns None