Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,43 @@ async def _dns_validate_host(host: str, port: int) -> None:
_check_safe_host(addr, "resolved address")


def _owned_pinned_client(url: str, timeout: float) -> httpx.AsyncClient:
"""Build an SDK-owned ``AsyncClient`` pinned to ``url``'s validated IP.

Resolves the host once via :func:`resolve_and_validate_host` and wires
the resulting IP into an :class:`AsyncIpPinnedTransport`, so httpx
connects to the address the SSRF gate approved instead of re-resolving
at connect time. This is what closes the DNS-rebinding TOCTOU that the
:func:`_dns_validate_host` pre-check alone leaves open: the pre-check
and the connect now observe the *same* resolution.

``trust_env=False`` so an ``HTTPS_PROXY`` / ``HTTP_PROXY`` in the
environment can't route the request through a proxy pool that ignores
the pinned backend — that would reopen the same TOCTOU.

Only call this from branches where the SDK owns transport construction.
When a caller injects their own client the SDK does not control the
transport, so the pre-check remains the only available guard there.

Raises:
AdagentsValidationError: If the host doesn't resolve or every
resolved address is in a blocked/reserved range. Maps the
transport layer's :class:`SSRFValidationError` onto the
adagents error type so callers see one exception family.
"""
# Lazy import: keeps httpcore (a transport-only dependency) off the
# adagents module-load path and avoids a load-time cycle, matching
# adcp.signing.jwks.default_jwks_fetcher.
from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport
from adcp.signing.jwks import SSRFValidationError

try:
transport = build_async_ip_pinned_transport(url)
except SSRFValidationError as e:
raise AdagentsValidationError(f"SSRF validation failed for {url!r}: {e}") from e
return httpx.AsyncClient(transport=transport, timeout=timeout, trust_env=False)


def _validate_publisher_domain(domain: str) -> str:
"""Validate and sanitize publisher domain for security.

Expand Down Expand Up @@ -716,7 +753,7 @@ async def _fetch_ads_txt_managerdomains(
url, headers=headers, timeout=timeout, follow_redirects=False
)
else:
async with httpx.AsyncClient() as new_client:
async with _owned_pinned_client(url, timeout) as new_client:
response = await new_client.get(
url, headers=headers, timeout=timeout, follow_redirects=False
)
Expand All @@ -727,6 +764,12 @@ async def _fetch_ads_txt_managerdomains(
return _parse_managerdomains(response.text)
except (httpx.TimeoutException, httpx.RequestError):
return []
except AdagentsValidationError:
# The pinned-transport build re-resolves the host; if it now points
# at a blocked address (DNS rebinding between the pre-check and the
# connect), fail closed. This fallback is best-effort, so a blocked
# resolution is "no MANAGERDOMAIN found", same as a network error.
return []


def _ensure_safe_manager_domain(manager_domain: str) -> str | None:
Expand Down Expand Up @@ -1056,13 +1099,20 @@ async def _fetch_adagents_url(
parsed.hostname or "", parsed.port or (443 if parsed.scheme == "https" else 80)
)

# When the SDK owns the client, pin it to the validated IP so httpx
# connects to the address the SSRF gate approved rather than re-resolving
# at connect time. A failed resolve/SSRF check surfaces from
# _owned_pinned_client as AdagentsValidationError — not an httpx error, so
# it propagates past the handlers below, which is the correct fail-closed
# outcome for the primary fetch path (unlike the best-effort ads.txt
# fallback, we do NOT swallow it).
try:
if client is not None:
body, status_code, response_headers = await _stream_capped(
client, url, headers, timeout, max_bytes
)
else:
async with httpx.AsyncClient() as new_client:
async with _owned_pinned_client(url, timeout) as new_client:
body, status_code, response_headers = await _stream_capped(
new_client, url, headers, timeout, max_bytes
)
Expand Down Expand Up @@ -2207,13 +2257,17 @@ async def fetch_agent_authorizations_from_directory(

headers = {"User-Agent": "AdCP-Client/1.0", "Accept": "application/json"}

# SDK-owned client is pinned to the validated IP (see _fetch_adagents_url).
# A failed resolve/SSRF check raises AdagentsValidationError, which
# propagates past the httpx handlers below — the correct fail-closed
# outcome (we do not convert it into an empty result).
try:
if client is not None:
body, status_code, _ = await _stream_capped(
client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES
)
else:
async with httpx.AsyncClient() as new_client:
async with _owned_pinned_client(request_url, timeout) as new_client:
body, status_code, _ = await _stream_capped(
new_client, request_url, headers, timeout, MAX_DIRECTORY_PAGE_BYTES
)
Expand Down
107 changes: 107 additions & 0 deletions tests/test_adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,113 @@ def _resolve_mixed(host, port, *args, **kwargs):
with pytest.raises(AdagentsValidationError, match="private/reserved"):
await fetch_adagents("split-horizon.realhost.org")

@pytest.mark.asyncio
async def test_rebinding_after_precheck_connects_to_pinned_public_ip(self, monkeypatch):
# DNS rebinding TOCTOU (issue #757): a hostile resolver returns a
# PUBLIC IP while the SSRF gate runs, then flips to a private/loopback
# IP at connect time. With the SDK-owned client pinned to the IP the
# gate validated, httpx must connect to the pinned PUBLIC address and
# never re-resolve into the private one.
from adcp.adagents import fetch_adagents

public_ip = "93.184.216.34"
private_ip = "127.0.0.1"

# First two getaddrinfo calls (the _dns_validate_host pre-check and the
# pinned-transport build) see the public IP; any later call — which
# would only happen if httpx re-resolved at connect — flips to private.
call_count = {"n": 0}

def _rebinding_resolve(host, port, *args, **kwargs):
call_count["n"] += 1
ip = public_ip if call_count["n"] <= 2 else private_ip
return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (ip, port))]

monkeypatch.setattr(socket, "getaddrinfo", _rebinding_resolve)

# Intercept the network backend's actual TCP connect (the layer the
# pinned backend delegates to after rewriting host -> resolved IP) to
# capture the destination IP, then short-circuit before TLS.
import httpcore
from httpcore._backends.anyio import AnyIOBackend

connected_hosts: list[str] = []

async def _capture_connect(self, host, port, *args, **kwargs):
connected_hosts.append(host)
raise httpcore.ConnectError("intercepted before real connect")

monkeypatch.setattr(AnyIOBackend, "connect_tcp", _capture_connect)

# The connect is short-circuited, so the fetch fails — but as a
# validation error wrapping a network failure, not by reaching a
# private address.
with pytest.raises(AdagentsValidationError):
await fetch_adagents("rebinding.realhost.org")

# The pin held: every connect targeted the validated public IP, and
# the loopback address the resolver flipped to was never reached.
assert connected_hosts, "expected the pinned transport to attempt a connect"
assert all(h == public_ip for h in connected_hosts), connected_hosts
assert private_ip not in connected_hosts

@pytest.mark.asyncio
async def test_public_target_resolves_and_pins_then_serves_body(self, monkeypatch):
# Acceptance criterion: a legitimately public target still works end to
# end through the pinned, SDK-owned client — resolution succeeds, the
# connection pins to the public IP, and the adagents.json body is
# served and parsed.
from adcp.adagents import fetch_adagents

public_ip = "93.184.216.34"

def _resolve_public(host, port, *args, **kwargs):
return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (public_ip, port))]

monkeypatch.setattr(socket, "getaddrinfo", _resolve_public)

body = json.dumps(
{
"$schema": "/schemas/2.6.0/adagents.json",
"authorized_agents": [
{
"url": "https://agent.example.com",
"authorized_for": "Example inventory",
"authorization_type": "property_ids",
"property_ids": ["site1"],
}
],
"last_updated": "2025-01-15T10:00:00Z",
}
).encode("utf-8")

# Spy on the SDK's owned-client builder: let it construct the REAL
# pinned transport (so resolution + IP selection run for real), record
# the IP it pinned, then serve the response body via a MockTransport so
# the test stays offline.
import adcp.adagents as adagents_mod
from adcp.signing.ip_pinned_transport import build_async_ip_pinned_transport

pinned_ips: list[str] = []

def _spy_owned_pinned_client(url, timeout):
transport = build_async_ip_pinned_transport(url)
# AsyncIpPinnedTransport pins to a single resolved IP; surface it
# so the test can assert the validated public IP was chosen.
pinned_ips.append(transport._pool._network_backend._resolved_ip)

def _handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, content=body)

return httpx.AsyncClient(transport=httpx.MockTransport(_handler), timeout=timeout)

monkeypatch.setattr(adagents_mod, "_owned_pinned_client", _spy_owned_pinned_client)

result = await fetch_adagents("legit.realhost.org")

assert result["authorized_agents"][0]["url"] == "https://agent.example.com"
assert pinned_ips == [public_ip], pinned_ips

@pytest.mark.asyncio
async def test_redirect_uses_fresh_client(self):
"""Redirect hops should not reuse the caller's client."""
Expand Down
Loading