From 6969dc266e3427cd542452763d41ba135168e6b4 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 6 Jun 2026 15:28:17 -0400 Subject: [PATCH 1/2] feat(server): add lazy idempotency backend wrapper (JS #2136 parity) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add LazyBackend / create_lazy_backend, mirroring the JS SDK createLazyBackend. Defers idempotency backend construction until first use for adopters whose Pg/Redis pool resolves asynchronously from app infrastructure. - Resolve-once, memoized; concurrent first-use shares a single factory invocation via an asyncio.Lock. - A failed factory attempt is not memoized — a later call retries. - Accepts sync or async factories; validates the resolved value is an IdempotencyBackend. - clear_all is opt-in (allow_clear_all=True); method presence is the reset-safety contract, so it is genuinely absent otherwise. - Delegates get / put / delete_expired to the resolved instance. Exported from adcp.server.idempotency (PgBackend precedent: not all backends are re-exported at the top-level adcp.server namespace). Closes #927 Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/server/idempotency/__init__.py | 8 + src/adcp/server/idempotency/lazy.py | 157 ++++++++++++++++++ tests/test_server_idempotency.py | 205 ++++++++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 src/adcp/server/idempotency/lazy.py diff --git a/src/adcp/server/idempotency/__init__.py b/src/adcp/server/idempotency/__init__.py index 9bd77cb43..45eac478d 100644 --- a/src/adcp/server/idempotency/__init__.py +++ b/src/adcp/server/idempotency/__init__.py @@ -65,6 +65,11 @@ async def get_adcp_capabilities(self, params, context=None): canonical_json_sha256, strip_excluded_fields, ) +from adcp.server.idempotency.lazy import ( + LazyBackend, + LazyBackendFactory, + create_lazy_backend, +) from adcp.server.idempotency.store import IdempotencyStore, is_wrapped from adcp.server.idempotency.webhook_dedup import WebhookDedupStore @@ -73,10 +78,13 @@ async def get_adcp_capabilities(self, params, context=None): "EXCLUDED_FIELDS", "IdempotencyBackend", "IdempotencyStore", + "LazyBackend", + "LazyBackendFactory", "MemoryBackend", "PgBackend", "WebhookDedupStore", "canonical_json_sha256", + "create_lazy_backend", "is_wrapped", "strip_excluded_fields", ] diff --git a/src/adcp/server/idempotency/lazy.py b/src/adcp/server/idempotency/lazy.py new file mode 100644 index 000000000..5aaa0e758 --- /dev/null +++ b/src/adcp/server/idempotency/lazy.py @@ -0,0 +1,157 @@ +"""Deferred-construction wrapper for :class:`IdempotencyBackend`. + +Mirrors the JS SDK ``createLazyBackend`` (adcp-client#2136). Use this when the +real backend depends on application infrastructure that resolves asynchronously +*after* the SDK server is constructed — for example a Postgres pool or Redis +client produced by an async bootstrap:: + + from adcp.server.idempotency import ( + IdempotencyStore, + LazyBackend, + PgBackend, + ) + + async def _resolve() -> IdempotencyBackend: + pool = await app.get_pg_pool() + backend = PgBackend(pool=pool) + await backend.create_schema() + return backend + + store = IdempotencyStore(backend=LazyBackend(_resolve), ttl_seconds=86400) + +The underlying backend is resolved on first use and memoized (resolve-once). +Concurrent first calls share a single factory invocation. If the factory +raises, the wrapper forgets that failed attempt so a later call can retry. + +``clear_all`` is **not** exposed by default: per the JS contract, the presence +of a bulk-clear method is treated as the backend's explicit "safe to flush" +signal, so it must be opted into via ``allow_clear_all=True``. Enable it only +when every backend the factory can return safely permits bulk clearing (for +example a dedicated test/dev :class:`MemoryBackend`, never a shared Redis). +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable + +from adcp.server.idempotency.backends import CachedResponse, IdempotencyBackend + +# A factory may be sync (returns a backend) or async (returns an awaitable that +# resolves to a backend) — both are normalized through ``_resolve`` below. +LazyBackendFactory = Callable[[], "IdempotencyBackend | Awaitable[IdempotencyBackend]"] + + +class LazyBackend(IdempotencyBackend): + """Resolve an :class:`IdempotencyBackend` lazily on first use. + + :param factory: A zero-arg callable returning an :class:`IdempotencyBackend` + or an awaitable that resolves to one. Invoked at most once across the + wrapper's lifetime once it succeeds; re-invoked only if a prior attempt + raised. + :param allow_clear_all: When ``True``, expose :meth:`clear_all`, delegating + to the resolved backend's ``clear_all`` or ``clear`` method. Defaults to + ``False`` because bulk clearing is dangerous on shared production stores + and the SDK uses method presence as the reset-safety contract. + + Concurrency: the first ``get``/``put``/``delete_expired`` triggers + resolution. Multiple concurrent first operations share a single factory + invocation via an :class:`asyncio.Lock`; later callers reuse the cached + instance without locking on the hot path. + """ + + def __init__( + self, + factory: LazyBackendFactory, + *, + allow_clear_all: bool = False, + ) -> None: + self._factory = factory + self._allow_clear_all = allow_clear_all + self._backend: IdempotencyBackend | None = None + self._lock = asyncio.Lock() + + async def _resolve(self) -> IdempotencyBackend: + """Return the resolved backend, invoking the factory once on first use. + + Resolve-once + concurrency-safe: the fast path returns the memoized + instance without locking. The slow path holds ``_lock`` so concurrent + first callers share a single factory invocation; the double-check inside + the lock means a caller that waited on the lock sees the instance the + winner resolved. A factory that raises is not memoized — the next call + retries. + """ + cached = self._backend + if cached is not None: + return cached + async with self._lock: + # Re-read under the lock: a task that lost the race to acquire it + # must observe the winner's resolved instance, not re-run the + # factory. (Read into a local so the narrowing is on the local, + # not the instance attribute another task may have mutated.) + cached = self._backend + if cached is not None: + return cached + result = self._factory() + resolved = await result if isinstance(result, Awaitable) else result + if not isinstance(resolved, IdempotencyBackend): + raise TypeError( + "LazyBackend factory must resolve to an IdempotencyBackend, " + f"got {type(resolved).__name__}" + ) + self._backend = resolved + return resolved + + async def get(self, scope_key: str, key: str) -> CachedResponse | None: + return await (await self._resolve()).get(scope_key, key) + + async def put(self, scope_key: str, key: str, entry: CachedResponse) -> None: + await (await self._resolve()).put(scope_key, key, entry) + + async def delete_expired(self, now_epoch: float | None = None) -> int: + return await (await self._resolve()).delete_expired(now_epoch) + + async def _clear_all(self) -> None: + """Delegate a bulk clear to the resolved backend. + + Resolves the backend (so the factory runs if it hasn't yet) and + delegates to its ``clear_all`` or ``clear`` method, raising if the + resolved backend supports neither. Exposed as ``clear_all`` only when + the wrapper is constructed with ``allow_clear_all=True`` (see + :meth:`__getattr__`). + """ + backend = await self._resolve() + clear = getattr(backend, "clear_all", None) or getattr(backend, "clear", None) + if clear is None: + raise NotImplementedError( + f"Resolved backend {type(backend).__name__} does not support " + "clear_all() or clear()." + ) + await clear() + + def __getattr__(self, name: str) -> object: + """Expose ``clear_all`` only when opted in. + + Mirrors the JS wrapper, which attaches ``clearAll`` to the returned + object solely when ``{ clearAll: true }`` — reset-safety code uses + ``hasattr``/method presence as the "safe to flush" contract, so the + attribute must genuinely be absent otherwise. ``__getattr__`` is only + consulted for names not found normally, so this never shadows the + delegating methods above. + """ + if name == "clear_all" and self.__dict__.get("_allow_clear_all"): + return self._clear_all + raise AttributeError(f"{type(self).__name__!r} object has no attribute {name!r}") + + +def create_lazy_backend( + factory: LazyBackendFactory, + *, + allow_clear_all: bool = False, +) -> LazyBackend: + """Construct a :class:`LazyBackend` — functional alias mirroring the JS + ``createLazyBackend`` factory shape. + + See :class:`LazyBackend` for semantics and parameter documentation. + """ + return LazyBackend(factory, allow_clear_all=allow_clear_all) diff --git a/tests/test_server_idempotency.py b/tests/test_server_idempotency.py index f4832d887..81013bafa 100644 --- a/tests/test_server_idempotency.py +++ b/tests/test_server_idempotency.py @@ -15,10 +15,13 @@ from adcp.server.idempotency import ( EXCLUDED_FIELDS, CachedResponse, + IdempotencyBackend, IdempotencyStore, + LazyBackend, MemoryBackend, PgBackend, canonical_json_sha256, + create_lazy_backend, strip_excluded_fields, ) @@ -187,6 +190,208 @@ async def writer(i: int) -> None: assert all(h.response["i"] == i for i, h in enumerate(hits)) # type: ignore[union-attr] +class TestLazyBackend: + """Deferred-construction wrapper (JS adcp-client#2136 parity). + + The factory must not run at construction; it runs on first use, is + memoized (resolve-once) even under concurrent first calls, and every + :class:`IdempotencyBackend` method delegates to the resolved instance. + ``clear_all`` is opt-in. Prefer a real :class:`MemoryBackend` behind the + factory over mocking so delegation exercises the actual contract. + """ + + @pytest.mark.asyncio + async def test_factory_not_called_at_construction(self) -> None: + calls = 0 + + def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + return MemoryBackend() + + LazyBackend(factory) + assert calls == 0 + + @pytest.mark.asyncio + async def test_factory_called_on_first_use(self) -> None: + calls = 0 + inner = MemoryBackend() + + def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + return inner + + backend = LazyBackend(factory) + assert calls == 0 + await backend.put("p", "k", CachedResponse("h", {"ok": True}, time.time() + 60)) + assert calls == 1 + got = await backend.get("p", "k") + assert got is not None and got.response == {"ok": True} + # Delegated to the same underlying instance. + assert await inner.get("p", "k") is not None + + @pytest.mark.asyncio + async def test_async_factory_resolved(self) -> None: + inner = MemoryBackend() + + async def factory() -> IdempotencyBackend: + return inner + + backend = create_lazy_backend(factory) + await backend.put("p", "k", CachedResponse("h", {"v": 1}, time.time() + 60)) + assert await inner.get("p", "k") is not None + + @pytest.mark.asyncio + async def test_resolved_once_across_operations(self) -> None: + calls = 0 + inner = MemoryBackend() + + async def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + return inner + + backend = LazyBackend(factory) + await backend.put("p", "k1", CachedResponse("h", {}, time.time() + 60)) + await backend.get("p", "k1") + await backend.delete_expired() + assert calls == 1 + + @pytest.mark.asyncio + async def test_concurrent_first_use_shares_one_factory_invocation(self) -> None: + calls = 0 + release = asyncio.Event() + inner = MemoryBackend() + + async def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + # Hold every concurrent first-caller inside the factory so they all + # land before any one resolves — proves the lock serializes them + # onto a single invocation, not just fast back-to-back calls. + await release.wait() + return inner + + backend = LazyBackend(factory) + ops = [ + backend.put("p", f"k-{i}", CachedResponse("h", {"i": i}, time.time() + 60)) + for i in range(20) + ] + gathered = asyncio.gather(*ops) + await asyncio.sleep(0) # let the tasks reach the factory await + release.set() + await gathered + assert calls == 1 + for i in range(20): + got = await backend.get("p", f"k-{i}") + assert got is not None and got.response == {"i": i} + + @pytest.mark.asyncio + async def test_failed_factory_is_retried(self) -> None: + calls = 0 + inner = MemoryBackend() + + async def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + if calls == 1: + raise RuntimeError("transient pool bootstrap failure") + return inner + + backend = LazyBackend(factory) + with pytest.raises(RuntimeError, match="transient pool bootstrap"): + await backend.get("p", "k") + # Failed attempt not memoized — a later call retries and succeeds. + await backend.put("p", "k", CachedResponse("h", {"ok": True}, time.time() + 60)) + assert calls == 2 + got = await backend.get("p", "k") + assert got is not None and got.response == {"ok": True} + + @pytest.mark.asyncio + async def test_factory_resolving_to_non_backend_raises(self) -> None: + backend = LazyBackend(lambda: object()) # type: ignore[arg-type, return-value] + with pytest.raises(TypeError, match="must resolve to an IdempotencyBackend"): + await backend.get("p", "k") + + @pytest.mark.asyncio + async def test_clear_all_not_exposed_by_default(self) -> None: + backend = LazyBackend(lambda: MemoryBackend()) + # Method presence is the reset-safety contract (JS parity). + assert not hasattr(backend, "clear_all") + + @pytest.mark.asyncio + async def test_clear_all_delegates_when_enabled(self) -> None: + inner = MemoryBackend() + backend = LazyBackend(lambda: inner, allow_clear_all=True) + assert hasattr(backend, "clear_all") + await backend.put("p", "k", CachedResponse("h", {"ok": True}, time.time() + 60)) + assert await inner.get("p", "k") is not None + await backend.clear_all() + # Cleared on the resolved backend. + assert await inner.get("p", "k") is None + assert await inner._size() == 0 + + @pytest.mark.asyncio + async def test_clear_all_resolves_factory_if_unused(self) -> None: + calls = 0 + inner = MemoryBackend() + + def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + return inner + + backend = LazyBackend(factory, allow_clear_all=True) + assert calls == 0 + await backend.clear_all() # first use is clear_all itself + assert calls == 1 + + @pytest.mark.asyncio + async def test_clear_all_raises_when_backend_has_no_clear(self) -> None: + class NoClearBackend(IdempotencyBackend): + async def get(self, scope_key: str, key: str) -> CachedResponse | None: + return None + + async def put(self, scope_key: str, key: str, entry: CachedResponse) -> None: + return None + + async def delete_expired(self, now_epoch: float | None = None) -> int: + return 0 + + backend = LazyBackend(lambda: NoClearBackend(), allow_clear_all=True) + with pytest.raises(NotImplementedError, match="does not support"): + await backend.clear_all() + + @pytest.mark.asyncio + async def test_store_drives_lazy_backend_end_to_end(self) -> None: + """The store treats LazyBackend like any backend: first wrapped call + resolves the factory, a replay hits the resolved backend.""" + calls = 0 + + async def factory() -> IdempotencyBackend: + nonlocal calls + calls += 1 + return MemoryBackend() + + store = IdempotencyStore(backend=LazyBackend(factory), ttl_seconds=86400) + handler = _FakeHandler() + wrapped = store.wrap(_FakeHandler.create_media_buy) + ctx = ToolContext(caller_identity="buyer-acme") + params = {"idempotency_key": str(uuid.uuid4()), "brand": "acme"} + + assert calls == 0 + first = await wrapped(handler, params, ctx) + assert calls == 1 # factory resolved on first wrapped call + assert handler.call_count == 1 + assert "replayed" not in first + + replay = await wrapped(handler, params, ctx) + assert handler.call_count == 1 # replayed, handler not re-run + assert replay["replayed"] is True + assert calls == 1 # backend resolved exactly once + + class TestPgBackendImportGuard: def test_construction_without_pg_extra_raises_import_error(self) -> None: """PgBackend requires the ``adcp[pg]`` extra. Without psycopg From 9ca4d12686df2724f9c5a42166b93a7468576eb2 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 6 Jun 2026 20:17:43 -0400 Subject: [PATCH 2/2] test(server): address idempotency test static-analysis findings Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_server_idempotency.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_server_idempotency.py b/tests/test_server_idempotency.py index 81013bafa..3a7250f37 100644 --- a/tests/test_server_idempotency.py +++ b/tests/test_server_idempotency.py @@ -281,7 +281,7 @@ async def factory() -> IdempotencyBackend: gathered = asyncio.gather(*ops) await asyncio.sleep(0) # let the tasks reach the factory await release.set() - await gathered + _ = await gathered # drain the put()s; their None results are unused assert calls == 1 for i in range(20): got = await backend.get("p", f"k-{i}") @@ -310,13 +310,13 @@ async def factory() -> IdempotencyBackend: @pytest.mark.asyncio async def test_factory_resolving_to_non_backend_raises(self) -> None: - backend = LazyBackend(lambda: object()) # type: ignore[arg-type, return-value] + backend = LazyBackend(object) # type: ignore[arg-type] with pytest.raises(TypeError, match="must resolve to an IdempotencyBackend"): await backend.get("p", "k") @pytest.mark.asyncio async def test_clear_all_not_exposed_by_default(self) -> None: - backend = LazyBackend(lambda: MemoryBackend()) + backend = LazyBackend(MemoryBackend) # Method presence is the reset-safety contract (JS parity). assert not hasattr(backend, "clear_all") @@ -389,7 +389,10 @@ async def factory() -> IdempotencyBackend: replay = await wrapped(handler, params, ctx) assert handler.call_count == 1 # replayed, handler not re-run assert replay["replayed"] is True - assert calls == 1 # backend resolved exactly once + # Replay must reuse the memoized backend, not re-resolve the factory. + # The factory call happens inside LazyBackend on the replay's backend + # read, so this counter can still increment here — it MUST stay 1. + assert calls == 1 # backend resolved exactly once, even on replay class TestPgBackendImportGuard: