From 1adc6934852bf5ce51fc80865173ff0fa1cd64be Mon Sep 17 00:00:00 2001 From: Renn F Date: Fri, 19 Jun 2026 20:48:42 +0200 Subject: [PATCH] fix(api): bind the v1 role guard to a verified agent token MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v1 flow role guards (_require_roles) are the sole gate for /api/v1/flow/* but checked only the X-Agent-Role string — unlike get_agent_context, which verifies the token. So a forged role header passed, and in strict mode (ROBOCO_AGENT_AUTH_REQUIRED) the token was never required on these routes. Call _check_agent_auth_token before the role membership test (deferred import to avoid the cycle): missing token stays a no-op in header-trust mode, any presented token is verified, and strict mode now requires it. Supersedes #222 (antfleet-ops flagged the bypass); re-done as our own change so it lands without the contributor CLA, and adds the positive valid-token test their PR was missing. --- roboco/api/routes/v1/_role_dep.py | 14 +++++++ tests/unit/api/test_v1_role_dep.py | 61 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/roboco/api/routes/v1/_role_dep.py b/roboco/api/routes/v1/_role_dep.py index 39ebeb38..2ff67672 100644 --- a/roboco/api/routes/v1/_role_dep.py +++ b/roboco/api/routes/v1/_role_dep.py @@ -21,8 +21,22 @@ def _require_roles(allowed: frozenset[Role]) -> params.Depends: def _check( + x_agent_id: Annotated[str, Header(alias="X-Agent-ID")], x_agent_role: Annotated[str, Header(alias="X-Agent-Role")], + x_agent_team: Annotated[str | None, Header(alias="X-Agent-Team")] = None, + x_agent_token: Annotated[str | None, Header(alias="X-Agent-Token")] = None, ) -> None: + # Bind the role header to a verified token BEFORE trusting it. These v1 + # flow guards are the sole gate for the /api/v1/flow/* endpoints, but + # previously checked only the role string — unlike get_agent_context, + # which already verifies the token. So a forged X-Agent-Role passed, and + # in strict mode (ROBOCO_AGENT_AUTH_REQUIRED) the token was never + # required here. In header-trust (dev) mode a missing token stays a + # no-op; any presented token is still verified. Deferred import avoids + # an import cycle with routers that import both this module and deps. + from roboco.api.deps import _check_agent_auth_token + + _check_agent_auth_token(x_agent_id, x_agent_role, x_agent_team, x_agent_token) # `Role` is a StrEnum, so the lowercase header string compares equal # to its matching member. if x_agent_role.lower() not in allowed: diff --git a/tests/unit/api/test_v1_role_dep.py b/tests/unit/api/test_v1_role_dep.py index 6197dfcf..b24e877a 100644 --- a/tests/unit/api/test_v1_role_dep.py +++ b/tests/unit/api/test_v1_role_dep.py @@ -9,15 +9,23 @@ from __future__ import annotations +from typing import TYPE_CHECKING from unittest.mock import AsyncMock, MagicMock from fastapi import FastAPI from fastapi.testclient import TestClient +from roboco.agents_config import issue_agent_token from roboco.api.deps import get_choreographer from roboco.api.routes.v1.flow_dev import router as flow_dev_router +if TYPE_CHECKING: + import pytest + _HTTP_200 = 200 +_HTTP_401 = 401 _HTTP_403 = 403 +_AGENT_ID = "00000000-0000-0000-0000-000000000001" +_SECRET = "test-secret-for-v1-role-dep" def _build_app() -> FastAPI: @@ -82,3 +90,56 @@ def test_dev_route_rejects_missing_role_header() -> None: # Missing X-Agent-Role => FastAPI 422 from header validation. # We just need it not to silently pass as 200. assert r.status_code != _HTTP_200 + + +def test_dev_route_401_when_auth_required_and_no_token( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # In strict mode the role guard must require the token, not just the header. + monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET) + monkeypatch.setenv("ROBOCO_AGENT_AUTH_REQUIRED", "true") + client = TestClient(_build_app()) + r = client.post( + "/api/v1/flow/developer/give_me_work", + json={}, + headers={"X-Agent-ID": _AGENT_ID, "X-Agent-Role": "developer"}, + ) + assert r.status_code == _HTTP_401 + + +def test_dev_route_rejects_invalid_token_even_in_dev( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Even in header-trust mode, a presented-but-forged token is rejected. + monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET) + monkeypatch.delenv("ROBOCO_AGENT_AUTH_REQUIRED", raising=False) + client = TestClient(_build_app()) + r = client.post( + "/api/v1/flow/developer/give_me_work", + json={}, + headers={ + "X-Agent-ID": _AGENT_ID, + "X-Agent-Role": "developer", + "X-Agent-Token": "forged-not-a-real-hmac", + }, + ) + assert r.status_code == _HTTP_401 + + +def test_dev_route_accepts_valid_token(monkeypatch: pytest.MonkeyPatch) -> None: + # The good path: a valid HMAC token for the allowed role passes the guard. + monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET) + monkeypatch.setenv("ROBOCO_AGENT_AUTH_REQUIRED", "true") + token = issue_agent_token(_AGENT_ID, "developer") + client = TestClient(_build_app()) + r = client.post( + "/api/v1/flow/developer/give_me_work", + json={}, + headers={ + "X-Agent-ID": _AGENT_ID, + "X-Agent-Role": "developer", + "X-Agent-Token": token, + }, + ) + # Verified token + allowed role → guard passes; mocked choreographer 200. + assert r.status_code == _HTTP_200