From 975b2ccd1851a5a1e3f1ad7f329eabf6c4ac961c Mon Sep 17 00:00:00 2001 From: JOBOYA Date: Tue, 12 May 2026 10:38:52 +0200 Subject: [PATCH 1/2] Add Codex OAuth provider --- README.md | 10 +- strix/interface/main.py | 21 +- strix/llm/codex_oauth.py | 357 ++++++++++++++++++++++++++++++++++ strix/llm/config.py | 4 + strix/llm/llm.py | 36 ++++ tests/llm/test_codex_oauth.py | 156 +++++++++++++++ 6 files changed, 581 insertions(+), 3 deletions(-) create mode 100644 strix/llm/codex_oauth.py create mode 100644 tests/llm/test_codex_oauth.py diff --git a/README.md b/README.md index 61ef8a3be..f1b4f6145 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ Strix are autonomous AI agents that act just like real hackers - they run your c **Prerequisites:** - Docker (running) -- An LLM API key from any [supported provider](https://docs.strix.ai/llm-providers/overview) (OpenAI, Anthropic, Google, etc.) +- An LLM API key from any [supported provider](https://docs.strix.ai/llm-providers/overview) (OpenAI, Anthropic, Google, etc.), or a Codex CLI ChatGPT login ### Installation & First Scan @@ -234,6 +234,14 @@ export PERPLEXITY_API_KEY="your-api-key" # for search capabilities export STRIX_REASONING_EFFORT="high" # control thinking effort (default: high, quick scan: medium) ``` +To use Codex OAuth instead of an API key, log in with the Codex CLI and select a +`codex/` model: + +```bash +codex login +export STRIX_LLM="codex/gpt-5.5" +``` + > [!NOTE] > Strix automatically saves your configuration to `~/.strix/cli-config.json`, so you don't have to re-enter it on every run. diff --git a/strix/interface/main.py b/strix/interface/main.py index bc88da673..7f0e586e8 100644 --- a/strix/interface/main.py +++ b/strix/interface/main.py @@ -20,6 +20,7 @@ from strix.config import Config, apply_saved_config, save_current_config from strix.config.config import resolve_llm_config +from strix.llm.codex_oauth import codex_model_name, complete_codex_oauth from strix.llm.utils import resolve_strix_model @@ -57,11 +58,12 @@ def validate_environment() -> None: # noqa: PLR0912, PLR0915 strix_llm = Config.get("strix_llm") uses_strix_models = strix_llm and strix_llm.startswith("strix/") + uses_codex_oauth = strix_llm and strix_llm.startswith("codex/") if not strix_llm: missing_required_vars.append("STRIX_LLM") - has_base_url = uses_strix_models or any( + has_base_url = uses_strix_models or uses_codex_oauth or any( [ Config.get("llm_api_base"), Config.get("openai_api_base"), @@ -70,7 +72,7 @@ def validate_environment() -> None: # noqa: PLR0912, PLR0915 ] ) - if not Config.get("llm_api_key"): + if not Config.get("llm_api_key") and not uses_codex_oauth: missing_optional_vars.append("LLM_API_KEY") if not has_base_url: @@ -209,6 +211,21 @@ async def warm_up_llm() -> None: try: model_name, api_key, api_base = resolve_llm_config() + + if model_name and model_name.startswith("codex/"): + test_messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Reply with just 'OK'."}, + ] + llm_timeout = int(Config.get("llm_timeout") or "300") + complete_codex_oauth( + codex_model_name(model_name), + test_messages, + Config.get("strix_reasoning_effort"), + llm_timeout, + ) + return + litellm_model, _ = resolve_strix_model(model_name) litellm_model = litellm_model or model_name diff --git a/strix/llm/codex_oauth.py b/strix/llm/codex_oauth.py new file mode 100644 index 000000000..2ecb05287 --- /dev/null +++ b/strix/llm/codex_oauth.py @@ -0,0 +1,357 @@ +import json +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import requests + + +CODEX_BACKEND_BASE_URL = "https://chatgpt.com/backend-api/codex" +CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token" # noqa: S105 +CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" + + +class CodexOAuthError(Exception): + """Raised when Codex OAuth credentials or backend calls fail.""" + + +@dataclass(frozen=True) +class CodexOAuthCredentials: + access_token: str + account_id: str | None = None + + +def default_codex_home() -> Path: + configured = os.getenv("CODEX_HOME") + if configured: + return Path(configured).expanduser() + return Path.home() / ".codex" + + +def load_codex_oauth_credentials(codex_home: Path | None = None) -> CodexOAuthCredentials: + home = codex_home or default_codex_home() + raw = _read_codex_auth_json(home) + tokens = raw.get("tokens") + if not isinstance(tokens, dict): + raise CodexOAuthError("Codex auth file does not contain OAuth tokens.") + + access_token = tokens.get("access_token") + if not isinstance(access_token, str) or not access_token.strip(): + raise CodexOAuthError("Codex auth file does not contain an access token.") + + account_id = tokens.get("account_id") + if not isinstance(account_id, str) or not account_id.strip(): + account_id = raw.get("account_id") + if not isinstance(account_id, str) or not account_id.strip(): + account_id = None + + return CodexOAuthCredentials(access_token=access_token, account_id=account_id) + + +def refresh_codex_oauth_credentials( + codex_home: Path | None = None, + post: Any = requests.post, +) -> CodexOAuthCredentials: + home = codex_home or default_codex_home() + auth_file = home / "auth.json" + raw = _read_codex_auth_json(home) + + tokens = raw.get("tokens") + if not isinstance(tokens, dict): + raise CodexOAuthError("Codex auth file does not contain OAuth tokens.") + + refresh_token = tokens.get("refresh_token") + if not isinstance(refresh_token, str) or not refresh_token.strip(): + raise CodexOAuthError("Codex auth file does not contain a refresh token.") + + try: + response = post( + CODEX_OAUTH_TOKEN_URL, + headers={"Content-Type": "application/json"}, + json={ + "client_id": CODEX_OAUTH_CLIENT_ID, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + }, + timeout=30, + ) + except requests.RequestException as exc: + raise CodexOAuthError(f"Codex OAuth token refresh failed: {exc}") from exc + + if not response.ok: + body = response.text[:1000] + raise CodexOAuthError( + f"Codex OAuth token refresh failed: HTTP {response.status_code}: {body}" + ) + + try: + refreshed = response.json() + except ValueError as exc: + raise CodexOAuthError(f"Codex OAuth token refresh returned invalid JSON: {exc}") from exc + + for token_name in ("id_token", "access_token", "refresh_token"): + token_value = refreshed.get(token_name) + if isinstance(token_value, str) and token_value.strip(): + tokens[token_name] = token_value + + try: + auth_file.write_text(json.dumps(raw, indent=2), encoding="utf-8") + except OSError as exc: + raise CodexOAuthError(f"Could not persist refreshed Codex OAuth tokens: {exc}") from exc + + return load_codex_oauth_credentials(home) + + +def codex_model_name(model_name: str) -> str: + return model_name.removeprefix("codex/") + + +def build_codex_responses_payload( + model: str, + messages: list[dict[str, Any]], + reasoning_effort: str | None, +) -> dict[str, Any]: + instructions: list[str] = [] + input_items: list[dict[str, Any]] = [] + + for message in messages: + role = str(message.get("role", "user")) + content = message.get("content", "") + if role == "system": + text = _content_to_text(content) + if text: + instructions.append(text) + continue + + response_role = "assistant" if role == "assistant" else "user" + input_items.append( + { + "type": "message", + "role": response_role, + "content": _content_to_responses_items(content, response_role), + } + ) + + payload: dict[str, Any] = { + "model": model, + "instructions": "\n\n".join(instructions), + "input": input_items, + "tools": [], + "tool_choice": "none", + "parallel_tool_calls": False, + "reasoning": {"effort": reasoning_effort} if reasoning_effort else None, + "store": False, + "stream": True, + "include": [], + } + + return payload + + +def complete_codex_oauth( + model: str, + messages: list[dict[str, Any]], + reasoning_effort: str | None, + timeout: int, + codex_home: Path | None = None, + base_url: str = CODEX_BACKEND_BASE_URL, +) -> tuple[str, dict[str, int]]: + credentials = load_codex_oauth_credentials(codex_home) + payload = build_codex_responses_payload(model, messages, reasoning_effort) + response = _post_codex_responses(base_url, credentials, payload, timeout) + + if response.status_code == 401: + credentials = refresh_codex_oauth_credentials(codex_home) + response = _post_codex_responses(base_url, credentials, payload, timeout) + if response.status_code == 401: + raise CodexOAuthError("Codex OAuth request was unauthorized. Run `codex login` again.") + if not response.ok: + body = response.text[:1000] + raise CodexOAuthError(f"Codex OAuth request failed: HTTP {response.status_code}: {body}") + + content_type = response.headers.get("content-type", "") + if "application/json" in content_type: + try: + return parse_responses_json_response(response.json()) + except ValueError as exc: + raise CodexOAuthError(f"Codex OAuth JSON response could not be parsed: {exc}") from exc + + return parse_responses_sse_events(response.iter_lines(decode_unicode=True)) + + +def _read_codex_auth_json(codex_home: Path) -> dict[str, Any]: + auth_file = codex_home / "auth.json" + if not auth_file.exists(): + raise CodexOAuthError( + f"Codex auth file not found at {auth_file}. Run `codex login` first." + ) + + try: + raw = json.loads(auth_file.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as exc: + raise CodexOAuthError(f"Could not read Codex auth file at {auth_file}: {exc}") from exc + if not isinstance(raw, dict): + raise CodexOAuthError("Codex auth file is not a JSON object.") + return raw + + +def _post_codex_responses( + base_url: str, + credentials: CodexOAuthCredentials, + payload: dict[str, Any], + timeout: int, +) -> requests.Response: + headers = { + "Authorization": f"Bearer {credentials.access_token}", + "Accept": "text/event-stream", + "Content-Type": "application/json", + "version": "strix-codex-oauth", + } + if credentials.account_id: + headers["ChatGPT-Account-ID"] = credentials.account_id + + try: + return requests.post( + f"{base_url.rstrip('/')}/responses", + headers=headers, + json=payload, + stream=True, + timeout=timeout, + ) + except requests.RequestException as exc: + raise CodexOAuthError(f"Codex OAuth request failed: {exc}") from exc + + +def parse_responses_sse_events(lines: Any) -> tuple[str, dict[str, int]]: + deltas: list[str] = [] + completed_parts: list[str] = [] + usage: dict[str, int] = {} + + for raw_line in lines: + event = _parse_sse_data_line(raw_line) + if event is None: + continue + + event_type = event.get("type") + if event_type == "response.output_text.delta": + delta = event.get("delta") + if isinstance(delta, str): + deltas.append(delta) + elif event_type == "response.output_item.done" and not deltas: + completed_parts.append(_extract_output_text(event.get("item"))) + elif event_type == "response.completed": + response = event.get("response") + if isinstance(response, dict): + usage = _extract_usage(response.get("usage")) + if not deltas and not completed_parts: + completed_parts.append(_extract_output_text(response)) + + content = "".join(deltas) if deltas else "".join(completed_parts) + return content, usage + + +def parse_responses_json_response(response: dict[str, Any]) -> tuple[str, dict[str, int]]: + return _extract_output_text(response), _extract_usage(response.get("usage")) + + +def _parse_sse_data_line(raw_line: Any) -> dict[str, Any] | None: + if isinstance(raw_line, bytes): + parsed_line = raw_line.decode("utf-8", errors="replace") + else: + parsed_line = raw_line + if not isinstance(parsed_line, str): + return None + + line = parsed_line.strip() + if not line.startswith("data:"): + return None + + data = line[5:].strip() + if not data or data == "[DONE]": + return None + + try: + event = json.loads(data) + except json.JSONDecodeError: + return None + return event if isinstance(event, dict) else None + + +def _content_to_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text = item.get("text") + if isinstance(text, str): + parts.append(text) + return "\n".join(parts) + return str(content) + + +def _content_to_responses_items(content: Any, role: str) -> list[dict[str, Any]]: + text_type = "output_text" if role == "assistant" else "input_text" + + if isinstance(content, str): + return [{"type": text_type, "text": content}] + + if not isinstance(content, list): + return [{"type": text_type, "text": str(content)}] + + result: list[dict[str, Any]] = [] + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") == "text": + text = item.get("text") + if isinstance(text, str): + result.append({"type": text_type, "text": text}) + elif role == "user" and item.get("type") == "image_url": + image_url = item.get("image_url") + if isinstance(image_url, dict): + image_url = image_url.get("url") + if isinstance(image_url, str): + result.append({"type": "input_image", "image_url": image_url}) + + return result or [{"type": text_type, "text": ""}] + + +def _extract_output_text(value: Any) -> str: + if not isinstance(value, dict): + return "" + + output_text = value.get("output_text") + if isinstance(output_text, str): + return output_text + + parts: list[str] = [] + output = value.get("output") + if isinstance(output, list): + parts.extend(_extract_output_text(item) for item in output) + + content = value.get("content") + if isinstance(content, list): + for item in content: + if isinstance(item, dict): + text = item.get("text") + if isinstance(text, str): + parts.append(text) + + return "".join(parts) + + +def _extract_usage(value: Any) -> dict[str, int]: + if not isinstance(value, dict): + return {} + + usage: dict[str, int] = {} + input_tokens = value.get("input_tokens") or value.get("prompt_tokens") + output_tokens = value.get("output_tokens") or value.get("completion_tokens") + if isinstance(input_tokens, int): + usage["input_tokens"] = input_tokens + if isinstance(output_tokens, int): + usage["output_tokens"] = output_tokens + return usage diff --git a/strix/llm/config.py b/strix/llm/config.py index 017c77662..429c8160e 100644 --- a/strix/llm/config.py +++ b/strix/llm/config.py @@ -2,6 +2,7 @@ from strix.config import Config from strix.config.config import resolve_llm_config +from strix.llm.codex_oauth import codex_model_name from strix.llm.utils import resolve_strix_model @@ -24,6 +25,9 @@ def __init__( if not self.model_name: raise ValueError("STRIX_LLM environment variable must be set and not empty") + self.uses_codex_oauth = self.model_name.startswith("codex/") + self.codex_model = codex_model_name(self.model_name) if self.uses_codex_oauth else None + api_model, canonical = resolve_strix_model(self.model_name) self.litellm_model: str = api_model or self.model_name self.canonical_model: str = canonical or self.model_name diff --git a/strix/llm/llm.py b/strix/llm/llm.py index 6fd727d5f..ec35621e0 100644 --- a/strix/llm/llm.py +++ b/strix/llm/llm.py @@ -10,6 +10,7 @@ from litellm.utils import supports_prompt_caching, supports_vision from strix.config import Config +from strix.llm.codex_oauth import complete_codex_oauth from strix.llm.config import LLMConfig from strix.llm.memory_compressor import MemoryCompressor, get_message_tokens from strix.llm.utils import ( @@ -187,6 +188,11 @@ async def generate( await asyncio.sleep(wait) async def _stream(self, messages: list[dict[str, Any]]) -> AsyncIterator[LLMResponse]: + if self.config.uses_codex_oauth: + async for response in self._stream_codex_oauth(messages): + yield response + return + accumulated = "" chunks: list[Any] = [] done_streaming = 0 @@ -291,6 +297,36 @@ def _build_completion_args(self, messages: list[dict[str, Any]]) -> dict[str, An return args + async def _stream_codex_oauth( + self, messages: list[dict[str, Any]] + ) -> AsyncIterator[LLMResponse]: + self._total_stats.requests += 1 + model = self.config.codex_model or self.config.model_name + content, usage = await asyncio.to_thread( + complete_codex_oauth, + model, + messages, + self._reasoning_effort, + self.config.timeout, + ) + + if usage: + self._total_stats.input_tokens += usage.get("input_tokens", 0) + self._total_stats.output_tokens += usage.get("output_tokens", 0) + + if content: + yield LLMResponse(content=content) + + content = _THINKING_BLOCK_RE.sub("", content) + content = normalize_tool_format(content) + content = fix_incomplete_tool_call(_truncate_to_first_function(content)) + + yield LLMResponse( + content=content, + tool_invocations=parse_tool_invocations(content), + thinking_blocks=None, + ) + def _get_chunk_content(self, chunk: Any) -> str: if chunk.choices and hasattr(chunk.choices[0], "delta"): return getattr(chunk.choices[0].delta, "content", "") or "" diff --git a/tests/llm/test_codex_oauth.py b/tests/llm/test_codex_oauth.py new file mode 100644 index 000000000..234247cb0 --- /dev/null +++ b/tests/llm/test_codex_oauth.py @@ -0,0 +1,156 @@ +import json +from pathlib import Path + +import pytest + +from strix.llm.codex_oauth import ( + CodexOAuthError, + build_codex_responses_payload, + load_codex_oauth_credentials, + parse_responses_sse_events, + refresh_codex_oauth_credentials, +) +from strix.llm.config import LLMConfig + + +def test_llm_config_detects_codex_oauth_provider(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5") + + config = LLMConfig() + + assert config.uses_codex_oauth is True + assert config.codex_model == "gpt-5.5" + + +def test_load_codex_oauth_credentials_reads_codex_auth_json(tmp_path: Path) -> None: + auth_file = tmp_path / "auth.json" + auth_file.write_text( + json.dumps( + { + "auth_mode": "chatgpt", + "tokens": { + "access_token": "access-token", + "account_id": "account-id", + }, + } + ), + encoding="utf-8", + ) + + credentials = load_codex_oauth_credentials(tmp_path) + + assert credentials.access_token == "access-token" # noqa: S105 + assert credentials.account_id == "account-id" + + +def test_load_codex_oauth_credentials_errors_when_not_logged_in(tmp_path: Path) -> None: + with pytest.raises(CodexOAuthError, match="Codex auth file not found"): + load_codex_oauth_credentials(tmp_path) + + +def test_refresh_codex_oauth_credentials_persists_new_tokens(tmp_path: Path) -> None: + auth_file = tmp_path / "auth.json" + auth_file.write_text( + json.dumps( + { + "auth_mode": "chatgpt", + "tokens": { + "id_token": "old-id", + "access_token": "old-access", + "refresh_token": "old-refresh", + "account_id": "account-id", + }, + } + ), + encoding="utf-8", + ) + + class FakeResponse: + status_code = 200 + ok = True + text = "" + + def json(self) -> dict[str, str]: + return { + "id_token": "new-id", + "access_token": "new-access", + "refresh_token": "new-refresh", + } + + calls = [] + + def fake_post(url: str, **kwargs): + calls.append((url, kwargs)) + return FakeResponse() + + credentials = refresh_codex_oauth_credentials(tmp_path, post=fake_post) + + updated = json.loads(auth_file.read_text(encoding="utf-8")) + assert credentials.access_token == "new-access" # noqa: S105 + assert updated["tokens"]["id_token"] == "new-id" # noqa: S105 + assert updated["tokens"]["access_token"] == "new-access" # noqa: S105 + assert updated["tokens"]["refresh_token"] == "new-refresh" # noqa: S105 + assert updated["tokens"]["account_id"] == "account-id" + assert calls[0][0] == "https://auth.openai.com/oauth/token" + assert calls[0][1]["json"]["grant_type"] == "refresh_token" + + +def test_build_codex_responses_payload_converts_chat_messages() -> None: + payload = build_codex_responses_payload( + model="gpt-5.5", + messages=[ + {"role": "system", "content": "System prompt."}, + {"role": "user", "content": "Find bugs."}, + {"role": "assistant", "content": "I will inspect the code."}, + { + "role": "user", + "content": [ + {"type": "text", "text": "Tool Results:"}, + {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}}, + ], + }, + ], + reasoning_effort="high", + ) + + assert payload["model"] == "gpt-5.5" + assert payload["instructions"] == "System prompt." + assert payload["stream"] is True + assert payload["reasoning"] == {"effort": "high"} + assert payload["input"] == [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Find bugs."}], + }, + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "I will inspect the code."}], + }, + { + "type": "message", + "role": "user", + "content": [ + {"type": "input_text", "text": "Tool Results:"}, + {"type": "input_image", "image_url": "data:image/png;base64,abc"}, + ], + }, + ] + + +def test_parse_responses_sse_events_extracts_text_deltas_and_usage() -> None: + content, usage = parse_responses_sse_events( + [ + 'data: {"type":"response.output_text.delta","delta":"hel"}', + 'data: {"type":"response.output_text.delta","delta":"lo"}', + ( + 'data: {"type":"response.completed","response":' + '{"usage":{"input_tokens":3,"output_tokens":2}}}' + ), + "data: [DONE]", + ] + ) + + assert content == "hello" + assert usage == {"input_tokens": 3, "output_tokens": 2} From 8022f48d1a6d69c718598f08977ad2dddd41da69 Mon Sep 17 00:00:00 2001 From: JOBOYA Date: Tue, 12 May 2026 10:56:42 +0200 Subject: [PATCH 2/2] Address Codex OAuth review feedback --- strix/llm/codex_oauth.py | 2 +- strix/llm/llm.py | 7 ++-- tests/llm/test_codex_oauth.py | 65 +++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/strix/llm/codex_oauth.py b/strix/llm/codex_oauth.py index 2ecb05287..80e24e72c 100644 --- a/strix/llm/codex_oauth.py +++ b/strix/llm/codex_oauth.py @@ -206,7 +206,7 @@ def _post_codex_responses( "Authorization": f"Bearer {credentials.access_token}", "Accept": "text/event-stream", "Content-Type": "application/json", - "version": "strix-codex-oauth", + "User-Agent": "strix-codex-oauth", } if credentials.account_id: headers["ChatGPT-Account-ID"] = credentials.account_id diff --git a/strix/llm/llm.py b/strix/llm/llm.py index ec35621e0..4f70c0138 100644 --- a/strix/llm/llm.py +++ b/strix/llm/llm.py @@ -10,7 +10,7 @@ from litellm.utils import supports_prompt_caching, supports_vision from strix.config import Config -from strix.llm.codex_oauth import complete_codex_oauth +from strix.llm.codex_oauth import CodexOAuthError, complete_codex_oauth from strix.llm.config import LLMConfig from strix.llm.memory_compressor import MemoryCompressor, get_message_tokens from strix.llm.utils import ( @@ -314,9 +314,6 @@ async def _stream_codex_oauth( self._total_stats.input_tokens += usage.get("input_tokens", 0) self._total_stats.output_tokens += usage.get("output_tokens", 0) - if content: - yield LLMResponse(content=content) - content = _THINKING_BLOCK_RE.sub("", content) content = normalize_tool_format(content) content = fix_incomplete_tool_call(_truncate_to_first_function(content)) @@ -384,6 +381,8 @@ def _extract_cost(self, response: Any) -> float: return 0.0 def _should_retry(self, e: Exception) -> bool: + if isinstance(e, CodexOAuthError): + return False code = getattr(e, "status_code", None) or getattr( getattr(e, "response", None), "status_code", None ) diff --git a/tests/llm/test_codex_oauth.py b/tests/llm/test_codex_oauth.py index 234247cb0..c848f22b9 100644 --- a/tests/llm/test_codex_oauth.py +++ b/tests/llm/test_codex_oauth.py @@ -4,13 +4,16 @@ import pytest from strix.llm.codex_oauth import ( + CodexOAuthCredentials, CodexOAuthError, + _post_codex_responses, build_codex_responses_payload, load_codex_oauth_credentials, parse_responses_sse_events, refresh_codex_oauth_credentials, ) from strix.llm.config import LLMConfig +from strix.llm.llm import LLM def test_llm_config_detects_codex_oauth_provider(monkeypatch: pytest.MonkeyPatch) -> None: @@ -22,6 +25,41 @@ def test_llm_config_detects_codex_oauth_provider(monkeypatch: pytest.MonkeyPatch assert config.codex_model == "gpt-5.5" +def test_llm_does_not_retry_codex_oauth_errors(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5") + + llm = LLM(LLMConfig()) + + assert llm._should_retry(CodexOAuthError("Run `codex login` first.")) is False + + +@pytest.mark.asyncio +async def test_codex_oauth_stream_yields_single_processed_response( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5") + + def fake_complete_codex_oauth(*args, **kwargs): + return ( + "hiddendone", + {"input_tokens": 1, "output_tokens": 2}, + ) + + monkeypatch.setattr("strix.llm.llm.complete_codex_oauth", fake_complete_codex_oauth) + + llm = LLM(LLMConfig()) + responses = [ + response + async for response in llm._stream_codex_oauth([{"role": "user", "content": "Hi"}]) + ] + + assert len(responses) == 1 + assert "" not in responses[0].content + assert responses[0].tool_invocations + assert llm._total_stats.input_tokens == 1 + assert llm._total_stats.output_tokens == 2 + + def test_load_codex_oauth_credentials_reads_codex_auth_json(tmp_path: Path) -> None: auth_file = tmp_path / "auth.json" auth_file.write_text( @@ -95,6 +133,33 @@ def fake_post(url: str, **kwargs): assert calls[0][1]["json"]["grant_type"] == "refresh_token" +def test_post_codex_responses_uses_user_agent_header( + monkeypatch: pytest.MonkeyPatch, +) -> None: + calls = [] + + class FakeResponse: + pass + + def fake_post(url: str, **kwargs): + calls.append((url, kwargs)) + return FakeResponse() + + monkeypatch.setattr("strix.llm.codex_oauth.requests.post", fake_post) + + response = _post_codex_responses( + "https://example.test/codex", + CodexOAuthCredentials(access_token="access-token", account_id="account-id"), + {"model": "gpt-5.5"}, + 60, + ) + + assert isinstance(response, FakeResponse) + headers = calls[0][1]["headers"] + assert headers["User-Agent"] == "strix-codex-oauth" + assert "version" not in headers + + def test_build_codex_responses_payload_converts_chat_messages() -> None: payload = build_codex_responses_payload( model="gpt-5.5",