From 53224f134bc99ec08b184d420ffb1de84a6ffa19 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Sat, 6 Jun 2026 12:18:21 -0400 Subject: [PATCH] fix(adagents): pin DNS to close rebinding TOCTOU on SSRF gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The three publisher-controlled fetch paths in adagents.py ran the _dns_validate_host resolve-and-gate pre-check, then built a plain httpx.AsyncClient() that re-resolved the host at connect time — leaving the DNS-rebinding window the pre-check docstring admits. Thread build_async_ip_pinned_transport into the SDK-owned AsyncClient construction sites (ads.txt MANAGERDOMAIN fetch, adagents.json stream, AAO directory fetch) via a new _owned_pinned_client helper. The client now pins to the IP the SSRF gate validated, so pre-check and connect observe the same resolution. SSRFValidationError is mapped onto AdagentsValidationError. Injected-client branches keep _dns_validate_host as their only guard since the SDK does not own that transport. Closes #757 Co-Authored-By: Claude Opus 4.8 (1M context) --- src/adcp/adagents.py | 60 +++++++++++++++++++++-- tests/test_adagents.py | 107 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 3 deletions(-) diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index cdbcfac8..214f9f9e 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -269,6 +269,43 @@ async def _dns_validate_host(host: str, port: int) -> None: _check_safe_host(addr, "resolved address") +def _owned_pinned_client(url: str, timeout: float) -> httpx.AsyncClient: + """Build an SDK-owned ``AsyncClient`` pinned to ``url``'s validated IP. + + Resolves the host once via :func:`resolve_and_validate_host` and wires + the resulting IP into an :class:`AsyncIpPinnedTransport`, so httpx + connects to the address the SSRF gate approved instead of re-resolving + at connect time. This is what closes the DNS-rebinding TOCTOU that the + :func:`_dns_validate_host` pre-check alone leaves open: the pre-check + and the connect now observe the *same* resolution. + + ``trust_env=False`` so an ``HTTPS_PROXY`` / ``HTTP_PROXY`` in the + environment can't route the request through a proxy pool that ignores + the pinned backend — that would reopen the same TOCTOU. + + Only call this from branches where the SDK owns transport construction. + When a caller injects their own client the SDK does not control the + transport, so the pre-check remains the only available guard there. + + Raises: + AdagentsValidationError: If the host doesn't resolve or every + resolved address is in a blocked/reserved range. Maps the + transport layer's :class:`SSRFValidationError` onto the + adagents error type so callers see one exception family. + """ + # Lazy import: keeps httpcore (a transport-only dependency) off the + # adagents module-load path and avoids a load-time cycle, matching + # adcp.signing.jwks.default_jwks_fetcher. + from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport + from adcp.signing.jwks import SSRFValidationError + + try: + transport = build_async_ip_pinned_transport(url) + except SSRFValidationError as e: + raise AdagentsValidationError(f"SSRF validation failed for {url!r}: {e}") from e + return httpx.AsyncClient(transport=transport, timeout=timeout, trust_env=False) + + def _validate_publisher_domain(domain: str) -> str: """Validate and sanitize publisher domain for security. @@ -716,7 +753,7 @@ async def _fetch_ads_txt_managerdomains( url, headers=headers, timeout=timeout, follow_redirects=False ) else: - async with httpx.AsyncClient() as new_client: + async with _owned_pinned_client(url, timeout) as new_client: response = await new_client.get( url, headers=headers, timeout=timeout, follow_redirects=False ) @@ -727,6 +764,12 @@ async def _fetch_ads_txt_managerdomains( return _parse_managerdomains(response.text) except (httpx.TimeoutException, httpx.RequestError): return [] + except AdagentsValidationError: + # The pinned-transport build re-resolves the host; if it now points + # at a blocked address (DNS rebinding between the pre-check and the + # connect), fail closed. This fallback is best-effort, so a blocked + # resolution is "no MANAGERDOMAIN found", same as a network error. + return [] def _ensure_safe_manager_domain(manager_domain: str) -> str | None: @@ -1056,13 +1099,20 @@ async def _fetch_adagents_url( parsed.hostname or "", parsed.port or (443 if parsed.scheme == "https" else 80) ) + # When the SDK owns the client, pin it to the validated IP so httpx + # connects to the address the SSRF gate approved rather than re-resolving + # at connect time. A failed resolve/SSRF check surfaces from + # _owned_pinned_client as AdagentsValidationError — not an httpx error, so + # it propagates past the handlers below, which is the correct fail-closed + # outcome for the primary fetch path (unlike the best-effort ads.txt + # fallback, we do NOT swallow it). try: if client is not None: body, status_code, response_headers = await _stream_capped( client, url, headers, timeout, max_bytes ) else: - async with httpx.AsyncClient() as new_client: + async with _owned_pinned_client(url, timeout) as new_client: body, status_code, response_headers = await _stream_capped( new_client, url, headers, timeout, max_bytes ) @@ -2207,13 +2257,17 @@ async def fetch_agent_authorizations_from_directory( headers = {"User-Agent": "AdCP-Client/1.0", "Accept": "application/json"} + # SDK-owned client is pinned to the validated IP (see _fetch_adagents_url). + # A failed resolve/SSRF check raises AdagentsValidationError, which + # propagates past the httpx handlers below — the correct fail-closed + # outcome (we do not convert it into an empty result). try: if client is not None: body, status_code, _ = await _stream_capped( client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES ) else: - async with httpx.AsyncClient() as new_client: + async with _owned_pinned_client(request_url, timeout) as new_client: body, status_code, _ = await _stream_capped( new_client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES ) diff --git a/tests/test_adagents.py b/tests/test_adagents.py index ec1b65a5..09538ed1 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -870,6 +870,113 @@ def _resolve_mixed(host, port, *args, **kwargs): with pytest.raises(AdagentsValidationError, match="private/reserved"): await fetch_adagents("split-horizon.realhost.org") + @pytest.mark.asyncio + async def test_rebinding_after_precheck_connects_to_pinned_public_ip(self, monkeypatch): + # DNS rebinding TOCTOU (issue #757): a hostile resolver returns a + # PUBLIC IP while the SSRF gate runs, then flips to a private/loopback + # IP at connect time. With the SDK-owned client pinned to the IP the + # gate validated, httpx must connect to the pinned PUBLIC address and + # never re-resolve into the private one. + from adcp.adagents import fetch_adagents + + public_ip = "93.184.216.34" + private_ip = "127.0.0.1" + + # First two getaddrinfo calls (the _dns_validate_host pre-check and the + # pinned-transport build) see the public IP; any later call — which + # would only happen if httpx re-resolved at connect — flips to private. + call_count = {"n": 0} + + def _rebinding_resolve(host, port, *args, **kwargs): + call_count["n"] += 1 + ip = public_ip if call_count["n"] <= 2 else private_ip + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (ip, port))] + + monkeypatch.setattr(socket, "getaddrinfo", _rebinding_resolve) + + # Intercept the network backend's actual TCP connect (the layer the + # pinned backend delegates to after rewriting host -> resolved IP) to + # capture the destination IP, then short-circuit before TLS. + import httpcore + from httpcore._backends.anyio import AnyIOBackend + + connected_hosts: list[str] = [] + + async def _capture_connect(self, host, port, *args, **kwargs): + connected_hosts.append(host) + raise httpcore.ConnectError("intercepted before real connect") + + monkeypatch.setattr(AnyIOBackend, "connect_tcp", _capture_connect) + + # The connect is short-circuited, so the fetch fails — but as a + # validation error wrapping a network failure, not by reaching a + # private address. + with pytest.raises(AdagentsValidationError): + await fetch_adagents("rebinding.realhost.org") + + # The pin held: every connect targeted the validated public IP, and + # the loopback address the resolver flipped to was never reached. + assert connected_hosts, "expected the pinned transport to attempt a connect" + assert all(h == public_ip for h in connected_hosts), connected_hosts + assert private_ip not in connected_hosts + + @pytest.mark.asyncio + async def test_public_target_resolves_and_pins_then_serves_body(self, monkeypatch): + # Acceptance criterion: a legitimately public target still works end to + # end through the pinned, SDK-owned client — resolution succeeds, the + # connection pins to the public IP, and the adagents.json body is + # served and parsed. + from adcp.adagents import fetch_adagents + + public_ip = "93.184.216.34" + + def _resolve_public(host, port, *args, **kwargs): + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (public_ip, port))] + + monkeypatch.setattr(socket, "getaddrinfo", _resolve_public) + + body = json.dumps( + { + "$schema": "/schemas/2.6.0/adagents.json", + "authorized_agents": [ + { + "url": "https://agent.example.com", + "authorized_for": "Example inventory", + "authorization_type": "property_ids", + "property_ids": ["site1"], + } + ], + "last_updated": "2025-01-15T10:00:00Z", + } + ).encode("utf-8") + + # Spy on the SDK's owned-client builder: let it construct the REAL + # pinned transport (so resolution + IP selection run for real), record + # the IP it pinned, then serve the response body via a MockTransport so + # the test stays offline. + import adcp.adagents as adagents_mod + from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport + + pinned_ips: list[str] = [] + + def _spy_owned_pinned_client(url, timeout): + transport = build_async_ip_pinned_transport(url) + # AsyncIpPinnedTransport pins to a single resolved IP; surface it + # so the test can assert the validated public IP was chosen. + pinned_ips.append(transport._pool._network_backend._resolved_ip) + + def _handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, content=body) + + return httpx.AsyncClient(transport=httpx.MockTransport(_handler), timeout=timeout) + + monkeypatch.setattr(adagents_mod, "_owned_pinned_client", _spy_owned_pinned_client) + + result = await fetch_adagents("legit.realhost.org") + + assert result["authorized_agents"][0]["url"] == "https://agent.example.com" + assert pinned_ips == [public_ip], pinned_ips + @pytest.mark.asyncio async def test_redirect_uses_fresh_client(self): """Redirect hops should not reuse the caller's client."""