Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions roboco/api/routes/v1/_role_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,22 @@

def _require_roles(allowed: frozenset[Role]) -> params.Depends:
def _check(
x_agent_id: Annotated[str, Header(alias="X-Agent-ID")],
x_agent_role: Annotated[str, Header(alias="X-Agent-Role")],
x_agent_team: Annotated[str | None, Header(alias="X-Agent-Team")] = None,
x_agent_token: Annotated[str | None, Header(alias="X-Agent-Token")] = None,
) -> None:
# Bind the role header to a verified token BEFORE trusting it. These v1
# flow guards are the sole gate for the /api/v1/flow/* endpoints, but
# previously checked only the role string — unlike get_agent_context,
# which already verifies the token. So a forged X-Agent-Role passed, and
# in strict mode (ROBOCO_AGENT_AUTH_REQUIRED) the token was never
# required here. In header-trust (dev) mode a missing token stays a
# no-op; any presented token is still verified. Deferred import avoids
# an import cycle with routers that import both this module and deps.
from roboco.api.deps import _check_agent_auth_token

_check_agent_auth_token(x_agent_id, x_agent_role, x_agent_team, x_agent_token)
# `Role` is a StrEnum, so the lowercase header string compares equal
# to its matching member.
if x_agent_role.lower() not in allowed:
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/api/test_v1_role_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, MagicMock

from fastapi import FastAPI
from fastapi.testclient import TestClient
from roboco.agents_config import issue_agent_token
from roboco.api.deps import get_choreographer
from roboco.api.routes.v1.flow_dev import router as flow_dev_router

if TYPE_CHECKING:
import pytest

_HTTP_200 = 200
_HTTP_401 = 401
_HTTP_403 = 403
_AGENT_ID = "00000000-0000-0000-0000-000000000001"
_SECRET = "test-secret-for-v1-role-dep"


def _build_app() -> FastAPI:
Expand Down Expand Up @@ -82,3 +90,56 @@ def test_dev_route_rejects_missing_role_header() -> None:
# Missing X-Agent-Role => FastAPI 422 from header validation.
# We just need it not to silently pass as 200.
assert r.status_code != _HTTP_200


def test_dev_route_401_when_auth_required_and_no_token(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# In strict mode the role guard must require the token, not just the header.
monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET)
monkeypatch.setenv("ROBOCO_AGENT_AUTH_REQUIRED", "true")
client = TestClient(_build_app())
r = client.post(
"/api/v1/flow/developer/give_me_work",
json={},
headers={"X-Agent-ID": _AGENT_ID, "X-Agent-Role": "developer"},
)
assert r.status_code == _HTTP_401


def test_dev_route_rejects_invalid_token_even_in_dev(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Even in header-trust mode, a presented-but-forged token is rejected.
monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET)
monkeypatch.delenv("ROBOCO_AGENT_AUTH_REQUIRED", raising=False)
client = TestClient(_build_app())
r = client.post(
"/api/v1/flow/developer/give_me_work",
json={},
headers={
"X-Agent-ID": _AGENT_ID,
"X-Agent-Role": "developer",
"X-Agent-Token": "forged-not-a-real-hmac",
},
)
assert r.status_code == _HTTP_401


def test_dev_route_accepts_valid_token(monkeypatch: pytest.MonkeyPatch) -> None:
# The good path: a valid HMAC token for the allowed role passes the guard.
monkeypatch.setenv("ROBOCO_AGENT_AUTH_SECRET", _SECRET)
monkeypatch.setenv("ROBOCO_AGENT_AUTH_REQUIRED", "true")
token = issue_agent_token(_AGENT_ID, "developer")
client = TestClient(_build_app())
r = client.post(
"/api/v1/flow/developer/give_me_work",
json={},
headers={
"X-Agent-ID": _AGENT_ID,
"X-Agent-Role": "developer",
"X-Agent-Token": token,
},
)
# Verified token + allowed role → guard passes; mocked choreographer 200.
assert r.status_code == _HTTP_200
Loading