From f9b7e0d9d973e922b701ad9bb2912ec4e872986f Mon Sep 17 00:00:00 2001 From: d3vyce Date: Wed, 10 Jun 2026 15:31:43 -0400 Subject: [PATCH 1/3] fix: enforce SecurityScopes instead of silently ignoring them Co-Authored-By: Claude Fable 5 --- docs/module/security.md | 16 ++ src/fastapi_toolsets/security/abc.py | 54 +++- .../security/sources/bearer.py | 27 +- .../security/sources/cookie.py | 22 +- .../security/sources/header.py | 21 +- .../security/sources/multi.py | 6 +- tests/test_security.py | 257 ++++++++++++++++++ 7 files changed, 383 insertions(+), 20 deletions(-) diff --git a/docs/module/security.md b/docs/module/security.md index 10ebd40..36bf38a 100644 --- a/docs/module/security.md +++ b/docs/module/security.md @@ -154,6 +154,22 @@ bearer = BearerTokenAuth(verify_token, role=Role.ADMIN, permission="billing:read Each auth instance is self-contained — create a separate instance per distinct requirement instead of passing requirements through `Security(scopes=[...])`. +### Security scopes + +Scopes declared on a route via `Security(auth, scopes=[...])` are never silently ignored. If the validator declares a `scopes` parameter, it receives the declared scopes (`list[str]`, empty when none are declared) and is responsible for enforcing them: + +```python +async def verify_token(token: str, scopes: list[str]) -> User: + user = await decode_token(token) + if not set(scopes) <= user.scopes: + raise UnauthorizedError() + return user +``` + +If the validator does **not** declare a `scopes` parameter, declaring scopes on a route raises `RuntimeError` at request time — the route advertises an authorization check in OpenAPI that the auth source cannot enforce. Custom `AuthSource` subclasses get the same fail-closed behaviour by default; override `authenticate_scoped(credential, scopes)` to support scopes. + +Because `scopes` is injected automatically, it is reserved: passing it as a validator kwarg at instantiation (or through `.require()`) raises `ValueError` at startup. Use a different keyword name for instance-bound requirements. + ### Using `.require()` inline If declaring a new top-level variable per role feels verbose, use `.require()` to create a configured clone directly in the route decorator. The original instance is not mutated: diff --git a/src/fastapi_toolsets/security/abc.py b/src/fastapi_toolsets/security/abc.py index 9eb8c93..aa25eec 100644 --- a/src/fastapi_toolsets/security/abc.py +++ b/src/fastapi_toolsets/security/abc.py @@ -23,6 +23,45 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper +def _accepts_scopes(fn: Callable[..., Any]) -> bool: + """Return whether *fn* declares a ``scopes`` parameter.""" + try: + parameters = inspect.signature(fn).parameters + except (TypeError, ValueError): + return False + param = parameters.get("scopes") + return param is not None and param.kind in ( + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + ) + + +def _reject_scopes_kwarg(kwargs: dict[str, Any]) -> None: + """Reject ``scopes`` as a validator kwarg.""" + if "scopes" in kwargs: + raise ValueError( + "'scopes' is a reserved validator kwarg: security scopes declared " + "on the route via Security(..., scopes=[...]) are injected " + "automatically. Use a different keyword name." + ) + + +def _scope_kwargs( + owner: object, accepts_scopes: bool, scopes: list[str] +) -> dict[str, Any]: + """Return the ``scopes`` kwarg for a validator, failing closed when unsupported.""" + if accepts_scopes: + return {"scopes": scopes} + if scopes: + raise RuntimeError( + f"{type(owner).__name__} cannot enforce the security scopes " + f"{scopes!r} declared on this route: its validator does not " + "declare a 'scopes' parameter. Add one to the validator or " + "remove scopes=... from Security()." + ) + return {} + + class AuthSource(ABC): """Abstract base class for authentication sources.""" @@ -32,12 +71,12 @@ def __init__(self) -> None: async def _call( request: Request, - security_scopes: SecurityScopes, # noqa: ARG001 + security_scopes: SecurityScopes, ) -> Any: credential = await source.extract(request) if credential is None: raise UnauthorizedError() - return await source.authenticate(credential) + return await source.authenticate_scoped(credential, security_scopes.scopes) self._call_fn: Callable[..., Any] = _call self.__signature__ = inspect.signature(_call) @@ -50,6 +89,17 @@ async def extract(self, request: Request) -> str | None: async def authenticate(self, credential: str) -> Any: """Validate a credential and return the authenticated identity.""" + async def authenticate_scoped(self, credential: str, scopes: list[str]) -> Any: + """Validate a credential, enforcing the scopes declared on the route.""" + if scopes: + raise RuntimeError( + f"{type(self).__name__} cannot enforce the security scopes " + f"{scopes!r} declared on this route. Override " + "authenticate_scoped() to support scopes or remove " + "scopes=... from Security()." + ) + return await self.authenticate(credential) + async def __call__(self, **kwargs: Any) -> Any: """FastAPI dependency dispatch.""" return await self._call_fn(**kwargs) diff --git a/src/fastapi_toolsets/security/sources/bearer.py b/src/fastapi_toolsets/security/sources/bearer.py index b33f432..70ad48e 100644 --- a/src/fastapi_toolsets/security/sources/bearer.py +++ b/src/fastapi_toolsets/security/sources/bearer.py @@ -9,7 +9,13 @@ from fastapi_toolsets.exceptions import UnauthorizedError -from ..abc import AuthSource, _ensure_async +from ..abc import ( + AuthSource, + _accepts_scopes, + _ensure_async, + _reject_scopes_kwarg, + _scope_kwargs, +) class BearerTokenAuth(AuthSource): @@ -42,29 +48,32 @@ def __init__( prefix: str | None = None, **kwargs: Any, ) -> None: + _reject_scopes_kwarg(kwargs) self._validator = _ensure_async(validator) + self._accepts_scopes = _accepts_scopes(validator) self._prefix = prefix self._kwargs = kwargs self._scheme = HTTPBearer(auto_error=False) async def _call( - security_scopes: SecurityScopes, # noqa: ARG001 + security_scopes: SecurityScopes, credentials: Annotated[ HTTPAuthorizationCredentials | None, Depends(self._scheme) ] = None, ) -> Any: if credentials is None: raise UnauthorizedError() - return await self._validate(credentials.credentials) + return await self._validate(credentials.credentials, security_scopes.scopes) self._call_fn = _call self.__signature__ = inspect.signature(_call) - async def _validate(self, token: str) -> Any: - """Check prefix and call the validator.""" + async def _validate(self, token: str, scopes: list[str]) -> Any: + """Check prefix and call the validator, forwarding declared scopes.""" if self._prefix is not None and not token.startswith(self._prefix): raise UnauthorizedError() - return await self._validator(token, **self._kwargs) + extra = _scope_kwargs(self, self._accepts_scopes, scopes) + return await self._validator(token, **extra, **self._kwargs) async def extract(self, request: Request) -> str | None: """Extract the raw credential from the request without validating. @@ -89,7 +98,11 @@ async def authenticate(self, credential: str) -> Any: Calls ``await validator(credential, **kwargs)`` where ``kwargs`` are the extra keyword arguments provided at instantiation. """ - return await self._validate(credential) + return await self._validate(credential, []) + + async def authenticate_scoped(self, credential: str, scopes: list[str]) -> Any: + """Validate a credential, forwarding route-declared scopes to the validator.""" + return await self._validate(credential, scopes) def require(self, **kwargs: Any) -> "BearerTokenAuth": """Return a new instance with additional (or overriding) validator kwargs.""" diff --git a/src/fastapi_toolsets/security/sources/cookie.py b/src/fastapi_toolsets/security/sources/cookie.py index 9fd6904..66d9156 100644 --- a/src/fastapi_toolsets/security/sources/cookie.py +++ b/src/fastapi_toolsets/security/sources/cookie.py @@ -13,7 +13,13 @@ from fastapi_toolsets.exceptions import UnauthorizedError -from ..abc import AuthSource, _ensure_async +from ..abc import ( + AuthSource, + _accepts_scopes, + _ensure_async, + _reject_scopes_kwarg, + _scope_kwargs, +) class CookieAuth(AuthSource): @@ -53,8 +59,10 @@ def __init__( secure: bool = True, **kwargs: Any, ) -> None: + _reject_scopes_kwarg(kwargs) self._name = name self._validator = _ensure_async(validator) + self._accepts_scopes = _accepts_scopes(validator) self._secret_key = secret_key self._ttl = ttl self._secure = secure @@ -62,13 +70,12 @@ def __init__( self._scheme = APIKeyCookie(name=name, auto_error=False) async def _call( - security_scopes: SecurityScopes, # noqa: ARG001 + security_scopes: SecurityScopes, value: Annotated[str | None, Depends(self._scheme)] = None, ) -> Any: if value is None: raise UnauthorizedError() - plain = self._verify(value) - return await self._validator(plain, **self._kwargs) + return await self.authenticate_scoped(value, security_scopes.scopes) self._call_fn = _call self.__signature__ = inspect.signature(_call) @@ -115,8 +122,13 @@ async def extract(self, request: Request) -> str | None: return request.cookies.get(self._name) async def authenticate(self, credential: str) -> Any: + return await self.authenticate_scoped(credential, []) + + async def authenticate_scoped(self, credential: str, scopes: list[str]) -> Any: + """Verify the cookie, forwarding route-declared scopes to the validator.""" plain = self._verify(credential) - return await self._validator(plain, **self._kwargs) + extra = _scope_kwargs(self, self._accepts_scopes, scopes) + return await self._validator(plain, **extra, **self._kwargs) def require(self, **kwargs: Any) -> "CookieAuth": """Return a new instance with additional (or overriding) validator kwargs.""" diff --git a/src/fastapi_toolsets/security/sources/header.py b/src/fastapi_toolsets/security/sources/header.py index ec4834f..cacef6b 100644 --- a/src/fastapi_toolsets/security/sources/header.py +++ b/src/fastapi_toolsets/security/sources/header.py @@ -8,7 +8,13 @@ from fastapi_toolsets.exceptions import UnauthorizedError -from ..abc import AuthSource, _ensure_async +from ..abc import ( + AuthSource, + _accepts_scopes, + _ensure_async, + _reject_scopes_kwarg, + _scope_kwargs, +) class APIKeyHeaderAuth(AuthSource): @@ -34,18 +40,20 @@ def __init__( validator: Callable[..., Any], **kwargs: Any, ) -> None: + _reject_scopes_kwarg(kwargs) self._name = name self._validator = _ensure_async(validator) + self._accepts_scopes = _accepts_scopes(validator) self._kwargs = kwargs self._scheme = APIKeyHeader(name=name, auto_error=False) async def _call( - security_scopes: SecurityScopes, # noqa: ARG001 + security_scopes: SecurityScopes, api_key: Annotated[str | None, Depends(self._scheme)] = None, ) -> Any: if api_key is None: raise UnauthorizedError() - return await self._validator(api_key, **self._kwargs) + return await self.authenticate_scoped(api_key, security_scopes.scopes) self._call_fn = _call self.__signature__ = inspect.signature(_call) @@ -56,7 +64,12 @@ async def extract(self, request: Request) -> str | None: async def authenticate(self, credential: str) -> Any: """Validate a credential and return the identity.""" - return await self._validator(credential, **self._kwargs) + return await self.authenticate_scoped(credential, []) + + async def authenticate_scoped(self, credential: str, scopes: list[str]) -> Any: + """Validate a credential, forwarding route-declared scopes to the validator.""" + extra = _scope_kwargs(self, self._accepts_scopes, scopes) + return await self._validator(credential, **extra, **self._kwargs) def require(self, **kwargs: Any) -> "APIKeyHeaderAuth": """Return a new instance with additional (or overriding) validator kwargs.""" diff --git a/src/fastapi_toolsets/security/sources/multi.py b/src/fastapi_toolsets/security/sources/multi.py index 5180b11..a28d636 100644 --- a/src/fastapi_toolsets/security/sources/multi.py +++ b/src/fastapi_toolsets/security/sources/multi.py @@ -23,13 +23,15 @@ def __init__(self, *sources: AuthSource) -> None: async def _call( request: Request, - security_scopes: SecurityScopes, # noqa: ARG001 + security_scopes: SecurityScopes, **kwargs: Any, # noqa: ARG001 — absorbs scheme values injected by FastAPI ) -> Any: for source in self._sources: credential = await source.extract(request) if credential is not None: - return await source.authenticate(credential) + return await source.authenticate_scoped( + credential, security_scopes.scopes + ) raise UnauthorizedError() self._call_fn = _call diff --git a/tests/test_security.py b/tests/test_security.py index 6887cf4..aa89b52 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -906,6 +906,263 @@ def test_delete_cookie(self): assert "session" in response.headers["set-cookie"] +class TestSecurityScopes: + """Scopes declared via Security(..., scopes=[...]) are enforced, not ignored.""" + + def test_scopes_declared_without_scopes_param_raises(self): + """Fail closed: validator can't receive scopes → RuntimeError, not silent authz skip.""" + bearer = BearerTokenAuth(simple_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(bearer, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + with pytest.raises(RuntimeError, match="security scopes"): + client.get("/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"}) + + def test_scopes_passed_to_bearer_validator(self): + received: list[list[str]] = [] + + async def scoped_validator(credential: str, scopes: list[str]) -> dict: + if credential != VALID_TOKEN: + raise UnauthorizedError() + received.append(scopes) + return {"user": "alice"} + + bearer = BearerTokenAuth(scoped_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(bearer, scopes=["admin", "billing"])): + return user + + client = TestClient(_app(setup)) + response = client.get( + "/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"} + ) + assert response.status_code == 200 + assert received == [["admin", "billing"]] + + def test_no_scopes_declared_passes_empty_list(self): + received: list[list[str]] = [] + + async def scoped_validator(credential: str, *, scopes: list[str]) -> dict: + received.append(scopes) + return {"user": "alice"} + + bearer = BearerTokenAuth(scoped_validator) + + def setup(app: FastAPI): + @app.get("/me") + async def me(user=Security(bearer)): + return user + + client = TestClient(_app(setup)) + response = client.get("/me", headers={"Authorization": f"Bearer {VALID_TOKEN}"}) + assert response.status_code == 200 + assert received == [[]] + + def test_scoped_validator_can_reject(self): + """A validator enforcing scopes turns missing scopes into a 401.""" + + async def scoped_validator(credential: str, scopes: list[str]) -> dict: + if credential != VALID_TOKEN or "admin" in scopes: + raise UnauthorizedError() + return {"user": "alice"} + + bearer = BearerTokenAuth(scoped_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(bearer, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + response = client.get( + "/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"} + ) + assert response.status_code == 401 + + def test_scopes_passed_to_cookie_validator(self): + received: list[list[str]] = [] + + async def scoped_validator(value: str, scopes: list[str]) -> dict: + received.append(scopes) + return {"session": value} + + auth = CookieAuth("session", scoped_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(auth, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + response = client.get("/admin", cookies={"session": VALID_COOKIE}) + assert response.status_code == 200 + assert received == [["admin"]] + + def test_cookie_scopes_declared_without_scopes_param_raises(self): + auth = CookieAuth("session", cookie_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(auth, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + with pytest.raises(RuntimeError, match="security scopes"): + client.get("/admin", cookies={"session": VALID_COOKIE}) + + def test_scopes_passed_to_api_key_validator(self): + received: list[list[str]] = [] + + async def scoped_validator(api_key: str, scopes: list[str]) -> dict: + received.append(scopes) + return {"user": "alice"} + + auth = APIKeyHeaderAuth("X-API-Key", scoped_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(auth, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + response = client.get("/admin", headers={"X-API-Key": VALID_TOKEN}) + assert response.status_code == 200 + assert received == [["admin"]] + + def test_api_key_scopes_declared_without_scopes_param_raises(self): + auth = APIKeyHeaderAuth("X-API-Key", simple_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(auth, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + with pytest.raises(RuntimeError, match="security scopes"): + client.get("/admin", headers={"X-API-Key": VALID_TOKEN}) + + def test_multi_auth_forwards_scopes_to_matched_source(self): + received: list[list[str]] = [] + + async def scoped_validator(credential: str, scopes: list[str]) -> dict: + received.append(scopes) + return {"user": "alice"} + + bearer = BearerTokenAuth(scoped_validator) + cookie = CookieAuth("session", cookie_validator) + multi = MultiAuth(bearer, cookie) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(multi, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + response = client.get( + "/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"} + ) + assert response.status_code == 200 + assert received == [["admin"]] + + def test_multi_auth_scopes_fail_closed_on_unscoped_source(self): + bearer = BearerTokenAuth(simple_validator) + multi = MultiAuth(bearer) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(multi, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + with pytest.raises(RuntimeError, match="security scopes"): + client.get("/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"}) + + def test_custom_source_default_fails_closed(self): + """AuthSource.authenticate_scoped default raises when scopes are declared.""" + auth = _HeaderAuth(secret="s3cr3t") + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(auth, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + with pytest.raises(RuntimeError, match="security scopes"): + client.get("/admin", headers={"X-Token": "s3cr3t"}) + + @pytest.mark.parametrize( + "factory", + [ + lambda: BearerTokenAuth(simple_validator, scopes=["admin"]), + lambda: CookieAuth("session", cookie_validator, scopes=["admin"]), + lambda: APIKeyHeaderAuth("X-API-Key", simple_validator, scopes=["admin"]), + ], + ids=["bearer", "cookie", "api-key"], + ) + def test_scopes_kwarg_rejected_at_init(self, factory): + """'scopes' is reserved for route-declared scopes — fail at startup.""" + with pytest.raises(ValueError, match="reserved"): + factory() + + def test_scopes_kwarg_rejected_via_require(self): + """require() goes through __init__, so the same guard applies.""" + bearer = BearerTokenAuth(simple_validator) + with pytest.raises(ValueError, match="reserved"): + bearer.require(scopes=["admin"]) + + def test_accepts_scopes_non_introspectable_callable(self): + """Callables without an inspectable signature are treated as scope-less.""" + from typing import Any, Callable, cast + + from fastapi_toolsets.security.abc import _accepts_scopes + + not_introspectable = cast(Callable[..., Any], object()) + assert _accepts_scopes(not_introspectable) is False + + @pytest.mark.anyio + async def test_bearer_authenticate_passes_no_scopes(self): + """authenticate() remains the scope-less entry point.""" + bearer = BearerTokenAuth(simple_validator) + assert await bearer.authenticate(VALID_TOKEN) == {"user": "alice"} + + @pytest.mark.anyio + async def test_cookie_authenticate_passes_no_scopes(self): + auth = CookieAuth("session", cookie_validator) + assert await auth.authenticate(VALID_COOKIE) == {"session": VALID_COOKIE} + + @pytest.mark.anyio + async def test_api_key_authenticate_passes_no_scopes(self): + auth = APIKeyHeaderAuth("X-API-Key", simple_validator) + assert await auth.authenticate(VALID_TOKEN) == {"user": "alice"} + + def test_sync_validator_with_scopes(self): + received: list[list[str]] = [] + + def sync_scoped_validator(credential: str, scopes: list[str]) -> dict: + received.append(scopes) + return {"user": "alice"} + + bearer = BearerTokenAuth(sync_scoped_validator) + + def setup(app: FastAPI): + @app.get("/admin") + async def admin(user=Security(bearer, scopes=["admin"])): + return user + + client = TestClient(_app(setup)) + response = client.get( + "/admin", headers={"Authorization": f"Bearer {VALID_TOKEN}"} + ) + assert response.status_code == 200 + assert received == [["admin"]] + + # Minimal concrete subclass used only in tests below. class _HeaderAuth(AuthSource): """Reads a custom X-Token header — no FastAPI security scheme.""" From 1298656a5042bd8e94fc12fd9e0647d27ba700dd Mon Sep 17 00:00:00 2001 From: d3vyce Date: Wed, 10 Jun 2026 15:48:19 -0400 Subject: [PATCH 2/3] fix: bind signed cookie signatures to the cookie name Co-Authored-By: Claude Fable 5 --- docs/module/security.md | 2 + .../security/sources/cookie.py | 11 ++-- tests/test_security.py | 53 ++++++++++++++----- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/docs/module/security.md b/docs/module/security.md index 36bf38a..19090f1 100644 --- a/docs/module/security.md +++ b/docs/module/security.md @@ -96,6 +96,8 @@ async def me(user: User = Security(cookie_auth)): Pass `secret_key` to enable HMAC-SHA256 signed, tamper-proof cookies. The cookie payload includes an expiry timestamp (`ttl`, default 24 h). No database entry is required — the signature is self-contained. +The signing key is derived from `secret_key` *and* the cookie name, so several `CookieAuth` instances can safely share one `secret_key`: a cookie issued as `session` is never accepted as `admin_session`. + Use `set_cookie()` to issue the signed cookie on login and `delete_cookie()` to clear it on logout: ```python diff --git a/src/fastapi_toolsets/security/sources/cookie.py b/src/fastapi_toolsets/security/sources/cookie.py index 66d9156..fd2c919 100644 --- a/src/fastapi_toolsets/security/sources/cookie.py +++ b/src/fastapi_toolsets/security/sources/cookie.py @@ -64,6 +64,11 @@ def __init__( self._validator = _ensure_async(validator) self._accepts_scopes = _accepts_scopes(validator) self._secret_key = secret_key + self._signing_key = ( + hmac.new(secret_key.encode(), name.encode(), hashlib.sha256).digest() + if secret_key is not None + else None + ) self._ttl = ttl self._secure = secure self._kwargs = kwargs @@ -81,11 +86,9 @@ async def _call( self.__signature__ = inspect.signature(_call) def _hmac(self, data: str) -> str: - if self._secret_key is None: + if self._signing_key is None: raise RuntimeError("_hmac called without secret_key configured") - return hmac.new( - self._secret_key.encode(), data.encode(), hashlib.sha256 - ).hexdigest() + return hmac.new(self._signing_key, data.encode(), hashlib.sha256).hexdigest() def _sign(self, value: str) -> str: data = base64.urlsafe_b64encode( diff --git a/tests/test_security.py b/tests/test_security.py index aa89b52..bf40719 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -757,6 +757,14 @@ class TestCookieAuthSigned: SECRET = "test-hmac-secret" + def _sig(self, data: str, name: str = "session") -> str: + """Signature as CookieAuth computes it: key derived from secret + name.""" + import hashlib as _hashlib + import hmac as _hmac + + key = _hmac.new(self.SECRET.encode(), name.encode(), _hashlib.sha256).digest() + return _hmac.new(key, data.encode(), _hashlib.sha256).hexdigest() + def test_valid_signed_cookie_via_set_cookie(self): """set_cookie signs the value; the signed cookie is verified on read.""" from fastapi import Response @@ -825,8 +833,6 @@ async def me(user=Security(auth)): def test_expired_signed_cookie_returns_401(self): """A signed cookie past its expiry timestamp is rejected.""" import base64 as _b64 - import hashlib as _hashlib - import hmac as _hmac import json as _json import time as _time @@ -841,17 +847,12 @@ async def me(user=Security(auth)): data = _b64.urlsafe_b64encode( _json.dumps({"v": VALID_COOKIE, "exp": int(_time.time()) - 1}).encode() ).decode() - sig = _hmac.new( - self.SECRET.encode(), data.encode(), _hashlib.sha256 - ).hexdigest() - response = client.get("/me", cookies={"session": f"{data}.{sig}"}) + response = client.get("/me", cookies={"session": f"{data}.{self._sig(data)}"}) assert response.status_code == 401 def test_invalid_json_payload_returns_401(self): """A signed cookie whose payload is not valid JSON is rejected.""" import base64 as _b64 - import hashlib as _hashlib - import hmac as _hmac auth = CookieAuth("session", cookie_validator, secret_key=self.SECRET) @@ -862,12 +863,40 @@ async def me(user=Security(auth)): client = TestClient(_app(setup)) data = _b64.urlsafe_b64encode(b"not-valid-json").decode() - sig = _hmac.new( - self.SECRET.encode(), data.encode(), _hashlib.sha256 - ).hexdigest() - response = client.get("/me", cookies={"session": f"{data}.{sig}"}) + response = client.get("/me", cookies={"session": f"{data}.{self._sig(data)}"}) + assert response.status_code == 401 + + def test_cookie_signed_for_other_name_returns_401(self): + """Same secret_key, different cookie name → signature must not transfer.""" + session_auth = CookieAuth("session", cookie_validator, secret_key=self.SECRET) + admin_auth = CookieAuth( + "admin_session", cookie_validator, secret_key=self.SECRET + ) + + def setup(app: FastAPI): + @app.get("/me") + async def me(user=Security(session_auth)): + return user + + client = TestClient(_app(setup)) + # A cookie minted by the admin_session instance, replayed under the + # session cookie name, must be rejected despite the shared secret. + forged = admin_auth._sign(VALID_COOKIE) + response = client.get("/me", cookies={"session": forged}) assert response.status_code == 401 + # Control: the same value signed by the session instance is accepted. + response = client.get( + "/me", cookies={"session": session_auth._sign(VALID_COOKIE)} + ) + assert response.status_code == 200 + + def test_signing_key_is_name_bound(self): + """The derived signing key differs per cookie name for the same secret.""" + a = CookieAuth("session", cookie_validator, secret_key=self.SECRET) + b = CookieAuth("admin_session", cookie_validator, secret_key=self.SECRET) + assert a._signing_key != b._signing_key + def test_malformed_cookie_no_dot_returns_401(self): """A signed cookie without the dot separator is rejected.""" auth = CookieAuth("session", cookie_validator, secret_key=self.SECRET) From 5c8bbb44412c7e9c95eac3a6ab15ec7541f92cae Mon Sep 17 00:00:00 2001 From: d3vyce Date: Wed, 10 Jun 2026 15:58:20 -0400 Subject: [PATCH 3/3] fix: require HTTPS and a matching issuer for OAuth/OIDC endpoints Co-Authored-By: Claude Fable 5 --- docs/module/security.md | 4 + src/fastapi_toolsets/security/oauth.py | 50 +++++- tests/test_security.py | 214 +++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 6 deletions(-) diff --git a/docs/module/security.md b/docs/module/security.md index 19090f1..03bc04b 100644 --- a/docs/module/security.md +++ b/docs/module/security.md @@ -292,6 +292,10 @@ auth_url, token_url, userinfo_url = await oauth_resolve_provider_urls( Returns a `(authorization_url, token_url, userinfo_url)` tuple. `userinfo_url` is `None` when the provider does not advertise one. +The discovery URL and every endpoint it returns must use `https://` — `oauth_resolve_provider_urls()` and `oauth_fetch_userinfo()` raise `ValueError` otherwise, so a tampered discovery document cannot redirect the token exchange (and your `client_secret`) to a plaintext endpoint. Plain `http://` is accepted for loopback hosts only (`localhost`, `127.0.0.1`, `::1`), so local development IdPs keep working. + +The document's `issuer` is also verified against the discovery URL ([OIDC Discovery 1.0 §4.3](https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationValidation)): for a standard `/.well-known/openid-configuration` URL, the `issuer` field must match `` or a `ValueError` is raised. Endpoints are *not* required to share the issuer's origin — providers like Google legitimately host their token endpoint on a different domain. + ### Authorization redirect [`oauth_build_authorization_redirect()`](../reference/security.md#fastapi_toolsets.security.oauth_build_authorization_redirect) constructs the redirect to the provider's authorization page. It requires a `state_token` — a random CSRF token generated by [`oauth_generate_state_token()`](../reference/security.md#fastapi_toolsets.security.oauth_generate_state_token) — that must be stored server-side (e.g. in the session) and verified on the callback to prevent login-CSRF attacks ([RFC 6749 §10.12](https://datatracker.ietf.org/doc/html/rfc6749#section-10.12)): diff --git a/src/fastapi_toolsets/security/oauth.py b/src/fastapi_toolsets/security/oauth.py index 4625ae1..d0c07dc 100644 --- a/src/fastapi_toolsets/security/oauth.py +++ b/src/fastapi_toolsets/security/oauth.py @@ -6,12 +6,37 @@ import json import secrets from typing import Any -from urllib.parse import urlencode +from urllib.parse import urlencode, urlsplit import httpx from async_lru import alru_cache from fastapi.responses import RedirectResponse +_LOOPBACK_HOSTS = frozenset({"localhost", "127.0.0.1", "::1"}) +_DISCOVERY_SUFFIX = "/.well-known/openid-configuration" + + +def _validate_issuer(discovery_url: str, issuer: Any) -> None: + """Check the discovery document claims the issuer it was fetched from.""" + if not discovery_url.endswith(_DISCOVERY_SUFFIX): + return + expected = discovery_url.removesuffix(_DISCOVERY_SUFFIX).rstrip("/") + if not isinstance(issuer, str) or issuer.rstrip("/") != expected: + raise ValueError( + f"discovery document issuer {issuer!r} does not match the " + f"expected issuer {expected!r} derived from the discovery URL" + ) + + +def _require_https(url: str, description: str) -> str: + """Reject OAuth URLs that would send credentials over plaintext HTTP.""" + parsed = urlsplit(url) + if parsed.scheme == "https": + return url + if parsed.scheme == "http" and parsed.hostname in _LOOPBACK_HOSTS: + return url + raise ValueError(f"{description} must use https:// (got {url!r})") + @alru_cache(maxsize=32) async def oauth_resolve_provider_urls( @@ -25,15 +50,25 @@ async def oauth_resolve_provider_urls( Returns: A ``(authorization_url, token_url, userinfo_url)`` tuple. *userinfo_url* is ``None`` when the provider does not advertise one. + + Raises: + ValueError: If *discovery_url* or any endpoint in the discovery + document is not HTTPS (loopback hosts excepted), or if the + document's ``issuer`` does not match the discovery URL. """ + _require_https(discovery_url, "OIDC discovery URL") async with httpx.AsyncClient() as client: resp = await client.get(discovery_url) resp.raise_for_status() cfg = resp.json() + _validate_issuer(discovery_url, cfg.get("issuer")) + userinfo_url = cfg.get("userinfo_endpoint") + if userinfo_url is not None: + _require_https(userinfo_url, "OIDC userinfo_endpoint") return ( - cfg["authorization_endpoint"], - cfg["token_endpoint"], - cfg.get("userinfo_endpoint"), + _require_https(cfg["authorization_endpoint"], "OIDC authorization_endpoint"), + _require_https(cfg["token_endpoint"], "OIDC token_endpoint"), + userinfo_url, ) @@ -64,9 +99,12 @@ async def oauth_fetch_userinfo( The JSON payload returned by the userinfo endpoint as a plain ``dict``. Raises: - ValueError: If the provider granted a different token type than ``bearer`` - or did not grant all ``required_scopes``. + ValueError: If *token_url* or *userinfo_url* is not HTTPS (loopback + hosts excepted), if the provider granted a different token type + than ``bearer``, or if it did not grant all ``required_scopes``. """ + _require_https(token_url, "OAuth token_url") + _require_https(userinfo_url, "OAuth userinfo_url") async with httpx.AsyncClient() as client: token_resp = await client.post( token_url, diff --git a/tests/test_security.py b/tests/test_security.py index bf40719..a656532 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -1405,6 +1405,7 @@ def test_redirect_location_contains_all_params(self): class TestResolveProviderUrls: def _discovery(self, *, userinfo=True): doc = { + "issuer": "https://auth.example.com", "authorization_endpoint": "https://auth.example.com/authorize", "token_endpoint": "https://auth.example.com/token", } @@ -1460,6 +1461,219 @@ async def test_caches_discovery_document(self): assert mock_client.get.call_count == 1 +class TestOAuthHttpsEnforcement: + """client_secret / codes / tokens must never travel over plaintext HTTP.""" + + def _discovery(self, **overrides): + doc = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "userinfo_endpoint": "https://auth.example.com/userinfo", + } + doc.update(overrides) + return doc + + @pytest.mark.anyio + async def test_http_discovery_url_rejected_before_fetch(self): + cm, mock_client = _make_async_client_mock() + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match="discovery URL must use https"): + await oauth_resolve_provider_urls( + "http://auth.example.com/.well-known/openid-configuration" + ) + + mock_client.get.assert_not_called() + + @pytest.mark.anyio + @pytest.mark.parametrize( + "endpoint", + ["authorization_endpoint", "token_endpoint", "userinfo_endpoint"], + ) + async def test_http_endpoint_in_discovery_document_rejected(self, endpoint): + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery( + **{endpoint: "http://attacker.example.com/endpoint"} + ) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match=f"{endpoint} must use https"): + await oauth_resolve_provider_urls( + "https://auth.example.com/.well-known/openid-configuration" + ) + + @pytest.mark.anyio + @pytest.mark.parametrize("host", ["localhost", "127.0.0.1"]) + async def test_http_loopback_discovery_allowed(self, host): + """Local development IdPs over plain HTTP keep working.""" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery( + issuer=f"http://{host}:8080", + authorization_endpoint=f"http://{host}:8080/authorize", + token_endpoint=f"http://{host}:8080/token", + userinfo_endpoint=f"http://{host}:8080/userinfo", + ) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + auth_url, token_url, userinfo_url = await oauth_resolve_provider_urls( + f"http://{host}:8080/.well-known/openid-configuration" + ) + + assert auth_url == f"http://{host}:8080/authorize" + assert token_url == f"http://{host}:8080/token" + assert userinfo_url == f"http://{host}:8080/userinfo" + + @pytest.mark.anyio + async def test_issuer_mismatch_rejected(self): + """OIDC Discovery §4.3: the document must claim the expected issuer.""" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery( + issuer="https://attacker.example.com" + ) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match="issuer"): + await oauth_resolve_provider_urls( + "https://auth.example.com/.well-known/openid-configuration" + ) + + @pytest.mark.anyio + async def test_missing_issuer_rejected(self): + doc = self._discovery() + del doc["issuer"] + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = doc + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match="issuer"): + await oauth_resolve_provider_urls( + "https://auth.example.com/.well-known/openid-configuration" + ) + + @pytest.mark.anyio + async def test_issuer_trailing_slash_accepted(self): + """Auth0-style issuers carry a trailing slash — still the same issuer.""" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery( + issuer="https://auth.example.com/" + ) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + _, token_url, _ = await oauth_resolve_provider_urls( + "https://auth.example.com/.well-known/openid-configuration" + ) + assert token_url == "https://auth.example.com/token" + + @pytest.mark.anyio + async def test_issuer_with_path_accepted(self): + """Keycloak/Microsoft-style issuers include a path component.""" + issuer = "https://auth.example.com/realms/myrealm" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery(issuer=issuer) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + _, token_url, _ = await oauth_resolve_provider_urls( + f"{issuer}/.well-known/openid-configuration" + ) + assert token_url == "https://auth.example.com/token" + + @pytest.mark.anyio + async def test_nonstandard_discovery_url_skips_issuer_check(self): + """No expected issuer can be derived from a non-standard layout.""" + mock_resp = MagicMock() + mock_resp.raise_for_status = MagicMock() + mock_resp.json.return_value = self._discovery( + issuer="https://unrelated.example.com" + ) + cm, _ = _make_async_client_mock(get_return=mock_resp) + + oauth_resolve_provider_urls.cache_clear() + with patch("httpx.AsyncClient", return_value=cm): + _, token_url, _ = await oauth_resolve_provider_urls( + "https://auth.example.com/custom/discovery.json" + ) + assert token_url == "https://auth.example.com/token" + + @pytest.mark.anyio + async def test_fetch_userinfo_http_token_url_rejected_before_request(self): + cm, mock_client = _make_async_client_mock() + + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match="token_url must use https"): + await oauth_fetch_userinfo( + token_url="http://auth.example.com/token", + userinfo_url="https://auth.example.com/userinfo", + code="authcode123", + client_id="client-id", + client_secret="client-secret", + redirect_uri="https://app.example.com/callback", + ) + + mock_client.post.assert_not_called() + + @pytest.mark.anyio + async def test_fetch_userinfo_http_userinfo_url_rejected_before_request(self): + cm, mock_client = _make_async_client_mock() + + with patch("httpx.AsyncClient", return_value=cm): + with pytest.raises(ValueError, match="userinfo_url must use https"): + await oauth_fetch_userinfo( + token_url="https://auth.example.com/token", + userinfo_url="http://auth.example.com/userinfo", + code="authcode123", + client_id="client-id", + client_secret="client-secret", + redirect_uri="https://app.example.com/callback", + ) + + mock_client.post.assert_not_called() + + @pytest.mark.anyio + async def test_fetch_userinfo_http_loopback_allowed(self): + token_resp = MagicMock() + token_resp.raise_for_status = MagicMock() + token_resp.json.return_value = {"access_token": "tok123"} + + userinfo_resp = MagicMock() + userinfo_resp.raise_for_status = MagicMock() + userinfo_resp.json.return_value = {"sub": "user-1"} + + cm, _ = _make_async_client_mock( + post_return=token_resp, get_return=userinfo_resp + ) + + with patch("httpx.AsyncClient", return_value=cm): + result = await oauth_fetch_userinfo( + token_url="http://localhost:8080/token", + userinfo_url="http://127.0.0.1:8080/userinfo", + code="authcode123", + client_id="client-id", + client_secret="client-secret", + redirect_uri="http://localhost:8000/callback", + ) + assert result == {"sub": "user-1"} + + class TestFetchUserinfo: @pytest.mark.anyio async def test_returns_userinfo_payload(self):