diff --git a/README.md b/README.md
index 61ef8a3be..f1b4f6145 100644
--- a/README.md
+++ b/README.md
@@ -72,7 +72,7 @@ Strix are autonomous AI agents that act just like real hackers - they run your c
**Prerequisites:**
- Docker (running)
-- An LLM API key from any [supported provider](https://docs.strix.ai/llm-providers/overview) (OpenAI, Anthropic, Google, etc.)
+- An LLM API key from any [supported provider](https://docs.strix.ai/llm-providers/overview) (OpenAI, Anthropic, Google, etc.), or a Codex CLI ChatGPT login
### Installation & First Scan
@@ -234,6 +234,14 @@ export PERPLEXITY_API_KEY="your-api-key" # for search capabilities
export STRIX_REASONING_EFFORT="high" # control thinking effort (default: high, quick scan: medium)
```
+To use Codex OAuth instead of an API key, log in with the Codex CLI and select a
+`codex/` model:
+
+```bash
+codex login
+export STRIX_LLM="codex/gpt-5.5"
+```
+
> [!NOTE]
> Strix automatically saves your configuration to `~/.strix/cli-config.json`, so you don't have to re-enter it on every run.
diff --git a/strix/interface/main.py b/strix/interface/main.py
index bc88da673..7f0e586e8 100644
--- a/strix/interface/main.py
+++ b/strix/interface/main.py
@@ -20,6 +20,7 @@
from strix.config import Config, apply_saved_config, save_current_config
from strix.config.config import resolve_llm_config
+from strix.llm.codex_oauth import codex_model_name, complete_codex_oauth
from strix.llm.utils import resolve_strix_model
@@ -57,11 +58,12 @@ def validate_environment() -> None: # noqa: PLR0912, PLR0915
strix_llm = Config.get("strix_llm")
uses_strix_models = strix_llm and strix_llm.startswith("strix/")
+ uses_codex_oauth = strix_llm and strix_llm.startswith("codex/")
if not strix_llm:
missing_required_vars.append("STRIX_LLM")
- has_base_url = uses_strix_models or any(
+ has_base_url = uses_strix_models or uses_codex_oauth or any(
[
Config.get("llm_api_base"),
Config.get("openai_api_base"),
@@ -70,7 +72,7 @@ def validate_environment() -> None: # noqa: PLR0912, PLR0915
]
)
- if not Config.get("llm_api_key"):
+ if not Config.get("llm_api_key") and not uses_codex_oauth:
missing_optional_vars.append("LLM_API_KEY")
if not has_base_url:
@@ -209,6 +211,21 @@ async def warm_up_llm() -> None:
try:
model_name, api_key, api_base = resolve_llm_config()
+
+ if model_name and model_name.startswith("codex/"):
+ test_messages = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ {"role": "user", "content": "Reply with just 'OK'."},
+ ]
+ llm_timeout = int(Config.get("llm_timeout") or "300")
+ complete_codex_oauth(
+ codex_model_name(model_name),
+ test_messages,
+ Config.get("strix_reasoning_effort"),
+ llm_timeout,
+ )
+ return
+
litellm_model, _ = resolve_strix_model(model_name)
litellm_model = litellm_model or model_name
diff --git a/strix/llm/codex_oauth.py b/strix/llm/codex_oauth.py
new file mode 100644
index 000000000..80e24e72c
--- /dev/null
+++ b/strix/llm/codex_oauth.py
@@ -0,0 +1,357 @@
+import json
+import os
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+import requests
+
+
+CODEX_BACKEND_BASE_URL = "https://chatgpt.com/backend-api/codex"
+CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token" # noqa: S105
+CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
+
+
+class CodexOAuthError(Exception):
+ """Raised when Codex OAuth credentials or backend calls fail."""
+
+
+@dataclass(frozen=True)
+class CodexOAuthCredentials:
+ access_token: str
+ account_id: str | None = None
+
+
+def default_codex_home() -> Path:
+ configured = os.getenv("CODEX_HOME")
+ if configured:
+ return Path(configured).expanduser()
+ return Path.home() / ".codex"
+
+
+def load_codex_oauth_credentials(codex_home: Path | None = None) -> CodexOAuthCredentials:
+ home = codex_home or default_codex_home()
+ raw = _read_codex_auth_json(home)
+ tokens = raw.get("tokens")
+ if not isinstance(tokens, dict):
+ raise CodexOAuthError("Codex auth file does not contain OAuth tokens.")
+
+ access_token = tokens.get("access_token")
+ if not isinstance(access_token, str) or not access_token.strip():
+ raise CodexOAuthError("Codex auth file does not contain an access token.")
+
+ account_id = tokens.get("account_id")
+ if not isinstance(account_id, str) or not account_id.strip():
+ account_id = raw.get("account_id")
+ if not isinstance(account_id, str) or not account_id.strip():
+ account_id = None
+
+ return CodexOAuthCredentials(access_token=access_token, account_id=account_id)
+
+
+def refresh_codex_oauth_credentials(
+ codex_home: Path | None = None,
+ post: Any = requests.post,
+) -> CodexOAuthCredentials:
+ home = codex_home or default_codex_home()
+ auth_file = home / "auth.json"
+ raw = _read_codex_auth_json(home)
+
+ tokens = raw.get("tokens")
+ if not isinstance(tokens, dict):
+ raise CodexOAuthError("Codex auth file does not contain OAuth tokens.")
+
+ refresh_token = tokens.get("refresh_token")
+ if not isinstance(refresh_token, str) or not refresh_token.strip():
+ raise CodexOAuthError("Codex auth file does not contain a refresh token.")
+
+ try:
+ response = post(
+ CODEX_OAUTH_TOKEN_URL,
+ headers={"Content-Type": "application/json"},
+ json={
+ "client_id": CODEX_OAUTH_CLIENT_ID,
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ },
+ timeout=30,
+ )
+ except requests.RequestException as exc:
+ raise CodexOAuthError(f"Codex OAuth token refresh failed: {exc}") from exc
+
+ if not response.ok:
+ body = response.text[:1000]
+ raise CodexOAuthError(
+ f"Codex OAuth token refresh failed: HTTP {response.status_code}: {body}"
+ )
+
+ try:
+ refreshed = response.json()
+ except ValueError as exc:
+ raise CodexOAuthError(f"Codex OAuth token refresh returned invalid JSON: {exc}") from exc
+
+ for token_name in ("id_token", "access_token", "refresh_token"):
+ token_value = refreshed.get(token_name)
+ if isinstance(token_value, str) and token_value.strip():
+ tokens[token_name] = token_value
+
+ try:
+ auth_file.write_text(json.dumps(raw, indent=2), encoding="utf-8")
+ except OSError as exc:
+ raise CodexOAuthError(f"Could not persist refreshed Codex OAuth tokens: {exc}") from exc
+
+ return load_codex_oauth_credentials(home)
+
+
+def codex_model_name(model_name: str) -> str:
+ return model_name.removeprefix("codex/")
+
+
+def build_codex_responses_payload(
+ model: str,
+ messages: list[dict[str, Any]],
+ reasoning_effort: str | None,
+) -> dict[str, Any]:
+ instructions: list[str] = []
+ input_items: list[dict[str, Any]] = []
+
+ for message in messages:
+ role = str(message.get("role", "user"))
+ content = message.get("content", "")
+ if role == "system":
+ text = _content_to_text(content)
+ if text:
+ instructions.append(text)
+ continue
+
+ response_role = "assistant" if role == "assistant" else "user"
+ input_items.append(
+ {
+ "type": "message",
+ "role": response_role,
+ "content": _content_to_responses_items(content, response_role),
+ }
+ )
+
+ payload: dict[str, Any] = {
+ "model": model,
+ "instructions": "\n\n".join(instructions),
+ "input": input_items,
+ "tools": [],
+ "tool_choice": "none",
+ "parallel_tool_calls": False,
+ "reasoning": {"effort": reasoning_effort} if reasoning_effort else None,
+ "store": False,
+ "stream": True,
+ "include": [],
+ }
+
+ return payload
+
+
+def complete_codex_oauth(
+ model: str,
+ messages: list[dict[str, Any]],
+ reasoning_effort: str | None,
+ timeout: int,
+ codex_home: Path | None = None,
+ base_url: str = CODEX_BACKEND_BASE_URL,
+) -> tuple[str, dict[str, int]]:
+ credentials = load_codex_oauth_credentials(codex_home)
+ payload = build_codex_responses_payload(model, messages, reasoning_effort)
+ response = _post_codex_responses(base_url, credentials, payload, timeout)
+
+ if response.status_code == 401:
+ credentials = refresh_codex_oauth_credentials(codex_home)
+ response = _post_codex_responses(base_url, credentials, payload, timeout)
+ if response.status_code == 401:
+ raise CodexOAuthError("Codex OAuth request was unauthorized. Run `codex login` again.")
+ if not response.ok:
+ body = response.text[:1000]
+ raise CodexOAuthError(f"Codex OAuth request failed: HTTP {response.status_code}: {body}")
+
+ content_type = response.headers.get("content-type", "")
+ if "application/json" in content_type:
+ try:
+ return parse_responses_json_response(response.json())
+ except ValueError as exc:
+ raise CodexOAuthError(f"Codex OAuth JSON response could not be parsed: {exc}") from exc
+
+ return parse_responses_sse_events(response.iter_lines(decode_unicode=True))
+
+
+def _read_codex_auth_json(codex_home: Path) -> dict[str, Any]:
+ auth_file = codex_home / "auth.json"
+ if not auth_file.exists():
+ raise CodexOAuthError(
+ f"Codex auth file not found at {auth_file}. Run `codex login` first."
+ )
+
+ try:
+ raw = json.loads(auth_file.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError) as exc:
+ raise CodexOAuthError(f"Could not read Codex auth file at {auth_file}: {exc}") from exc
+ if not isinstance(raw, dict):
+ raise CodexOAuthError("Codex auth file is not a JSON object.")
+ return raw
+
+
+def _post_codex_responses(
+ base_url: str,
+ credentials: CodexOAuthCredentials,
+ payload: dict[str, Any],
+ timeout: int,
+) -> requests.Response:
+ headers = {
+ "Authorization": f"Bearer {credentials.access_token}",
+ "Accept": "text/event-stream",
+ "Content-Type": "application/json",
+ "User-Agent": "strix-codex-oauth",
+ }
+ if credentials.account_id:
+ headers["ChatGPT-Account-ID"] = credentials.account_id
+
+ try:
+ return requests.post(
+ f"{base_url.rstrip('/')}/responses",
+ headers=headers,
+ json=payload,
+ stream=True,
+ timeout=timeout,
+ )
+ except requests.RequestException as exc:
+ raise CodexOAuthError(f"Codex OAuth request failed: {exc}") from exc
+
+
+def parse_responses_sse_events(lines: Any) -> tuple[str, dict[str, int]]:
+ deltas: list[str] = []
+ completed_parts: list[str] = []
+ usage: dict[str, int] = {}
+
+ for raw_line in lines:
+ event = _parse_sse_data_line(raw_line)
+ if event is None:
+ continue
+
+ event_type = event.get("type")
+ if event_type == "response.output_text.delta":
+ delta = event.get("delta")
+ if isinstance(delta, str):
+ deltas.append(delta)
+ elif event_type == "response.output_item.done" and not deltas:
+ completed_parts.append(_extract_output_text(event.get("item")))
+ elif event_type == "response.completed":
+ response = event.get("response")
+ if isinstance(response, dict):
+ usage = _extract_usage(response.get("usage"))
+ if not deltas and not completed_parts:
+ completed_parts.append(_extract_output_text(response))
+
+ content = "".join(deltas) if deltas else "".join(completed_parts)
+ return content, usage
+
+
+def parse_responses_json_response(response: dict[str, Any]) -> tuple[str, dict[str, int]]:
+ return _extract_output_text(response), _extract_usage(response.get("usage"))
+
+
+def _parse_sse_data_line(raw_line: Any) -> dict[str, Any] | None:
+ if isinstance(raw_line, bytes):
+ parsed_line = raw_line.decode("utf-8", errors="replace")
+ else:
+ parsed_line = raw_line
+ if not isinstance(parsed_line, str):
+ return None
+
+ line = parsed_line.strip()
+ if not line.startswith("data:"):
+ return None
+
+ data = line[5:].strip()
+ if not data or data == "[DONE]":
+ return None
+
+ try:
+ event = json.loads(data)
+ except json.JSONDecodeError:
+ return None
+ return event if isinstance(event, dict) else None
+
+
+def _content_to_text(content: Any) -> str:
+ if isinstance(content, str):
+ return content
+ if isinstance(content, list):
+ parts = []
+ for item in content:
+ if isinstance(item, dict) and item.get("type") == "text":
+ text = item.get("text")
+ if isinstance(text, str):
+ parts.append(text)
+ return "\n".join(parts)
+ return str(content)
+
+
+def _content_to_responses_items(content: Any, role: str) -> list[dict[str, Any]]:
+ text_type = "output_text" if role == "assistant" else "input_text"
+
+ if isinstance(content, str):
+ return [{"type": text_type, "text": content}]
+
+ if not isinstance(content, list):
+ return [{"type": text_type, "text": str(content)}]
+
+ result: list[dict[str, Any]] = []
+ for item in content:
+ if not isinstance(item, dict):
+ continue
+ if item.get("type") == "text":
+ text = item.get("text")
+ if isinstance(text, str):
+ result.append({"type": text_type, "text": text})
+ elif role == "user" and item.get("type") == "image_url":
+ image_url = item.get("image_url")
+ if isinstance(image_url, dict):
+ image_url = image_url.get("url")
+ if isinstance(image_url, str):
+ result.append({"type": "input_image", "image_url": image_url})
+
+ return result or [{"type": text_type, "text": ""}]
+
+
+def _extract_output_text(value: Any) -> str:
+ if not isinstance(value, dict):
+ return ""
+
+ output_text = value.get("output_text")
+ if isinstance(output_text, str):
+ return output_text
+
+ parts: list[str] = []
+ output = value.get("output")
+ if isinstance(output, list):
+ parts.extend(_extract_output_text(item) for item in output)
+
+ content = value.get("content")
+ if isinstance(content, list):
+ for item in content:
+ if isinstance(item, dict):
+ text = item.get("text")
+ if isinstance(text, str):
+ parts.append(text)
+
+ return "".join(parts)
+
+
+def _extract_usage(value: Any) -> dict[str, int]:
+ if not isinstance(value, dict):
+ return {}
+
+ usage: dict[str, int] = {}
+ input_tokens = value.get("input_tokens") or value.get("prompt_tokens")
+ output_tokens = value.get("output_tokens") or value.get("completion_tokens")
+ if isinstance(input_tokens, int):
+ usage["input_tokens"] = input_tokens
+ if isinstance(output_tokens, int):
+ usage["output_tokens"] = output_tokens
+ return usage
diff --git a/strix/llm/config.py b/strix/llm/config.py
index 017c77662..429c8160e 100644
--- a/strix/llm/config.py
+++ b/strix/llm/config.py
@@ -2,6 +2,7 @@
from strix.config import Config
from strix.config.config import resolve_llm_config
+from strix.llm.codex_oauth import codex_model_name
from strix.llm.utils import resolve_strix_model
@@ -24,6 +25,9 @@ def __init__(
if not self.model_name:
raise ValueError("STRIX_LLM environment variable must be set and not empty")
+ self.uses_codex_oauth = self.model_name.startswith("codex/")
+ self.codex_model = codex_model_name(self.model_name) if self.uses_codex_oauth else None
+
api_model, canonical = resolve_strix_model(self.model_name)
self.litellm_model: str = api_model or self.model_name
self.canonical_model: str = canonical or self.model_name
diff --git a/strix/llm/llm.py b/strix/llm/llm.py
index 6fd727d5f..4f70c0138 100644
--- a/strix/llm/llm.py
+++ b/strix/llm/llm.py
@@ -10,6 +10,7 @@
from litellm.utils import supports_prompt_caching, supports_vision
from strix.config import Config
+from strix.llm.codex_oauth import CodexOAuthError, complete_codex_oauth
from strix.llm.config import LLMConfig
from strix.llm.memory_compressor import MemoryCompressor, get_message_tokens
from strix.llm.utils import (
@@ -187,6 +188,11 @@ async def generate(
await asyncio.sleep(wait)
async def _stream(self, messages: list[dict[str, Any]]) -> AsyncIterator[LLMResponse]:
+ if self.config.uses_codex_oauth:
+ async for response in self._stream_codex_oauth(messages):
+ yield response
+ return
+
accumulated = ""
chunks: list[Any] = []
done_streaming = 0
@@ -291,6 +297,33 @@ def _build_completion_args(self, messages: list[dict[str, Any]]) -> dict[str, An
return args
+ async def _stream_codex_oauth(
+ self, messages: list[dict[str, Any]]
+ ) -> AsyncIterator[LLMResponse]:
+ self._total_stats.requests += 1
+ model = self.config.codex_model or self.config.model_name
+ content, usage = await asyncio.to_thread(
+ complete_codex_oauth,
+ model,
+ messages,
+ self._reasoning_effort,
+ self.config.timeout,
+ )
+
+ if usage:
+ self._total_stats.input_tokens += usage.get("input_tokens", 0)
+ self._total_stats.output_tokens += usage.get("output_tokens", 0)
+
+ content = _THINKING_BLOCK_RE.sub("", content)
+ content = normalize_tool_format(content)
+ content = fix_incomplete_tool_call(_truncate_to_first_function(content))
+
+ yield LLMResponse(
+ content=content,
+ tool_invocations=parse_tool_invocations(content),
+ thinking_blocks=None,
+ )
+
def _get_chunk_content(self, chunk: Any) -> str:
if chunk.choices and hasattr(chunk.choices[0], "delta"):
return getattr(chunk.choices[0].delta, "content", "") or ""
@@ -348,6 +381,8 @@ def _extract_cost(self, response: Any) -> float:
return 0.0
def _should_retry(self, e: Exception) -> bool:
+ if isinstance(e, CodexOAuthError):
+ return False
code = getattr(e, "status_code", None) or getattr(
getattr(e, "response", None), "status_code", None
)
diff --git a/tests/llm/test_codex_oauth.py b/tests/llm/test_codex_oauth.py
new file mode 100644
index 000000000..c848f22b9
--- /dev/null
+++ b/tests/llm/test_codex_oauth.py
@@ -0,0 +1,221 @@
+import json
+from pathlib import Path
+
+import pytest
+
+from strix.llm.codex_oauth import (
+ CodexOAuthCredentials,
+ CodexOAuthError,
+ _post_codex_responses,
+ build_codex_responses_payload,
+ load_codex_oauth_credentials,
+ parse_responses_sse_events,
+ refresh_codex_oauth_credentials,
+)
+from strix.llm.config import LLMConfig
+from strix.llm.llm import LLM
+
+
+def test_llm_config_detects_codex_oauth_provider(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5")
+
+ config = LLMConfig()
+
+ assert config.uses_codex_oauth is True
+ assert config.codex_model == "gpt-5.5"
+
+
+def test_llm_does_not_retry_codex_oauth_errors(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5")
+
+ llm = LLM(LLMConfig())
+
+ assert llm._should_retry(CodexOAuthError("Run `codex login` first.")) is False
+
+
+@pytest.mark.asyncio
+async def test_codex_oauth_stream_yields_single_processed_response(
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ monkeypatch.setenv("STRIX_LLM", "codex/gpt-5.5")
+
+ def fake_complete_codex_oauth(*args, **kwargs):
+ return (
+ "hiddendone",
+ {"input_tokens": 1, "output_tokens": 2},
+ )
+
+ monkeypatch.setattr("strix.llm.llm.complete_codex_oauth", fake_complete_codex_oauth)
+
+ llm = LLM(LLMConfig())
+ responses = [
+ response
+ async for response in llm._stream_codex_oauth([{"role": "user", "content": "Hi"}])
+ ]
+
+ assert len(responses) == 1
+ assert "" not in responses[0].content
+ assert responses[0].tool_invocations
+ assert llm._total_stats.input_tokens == 1
+ assert llm._total_stats.output_tokens == 2
+
+
+def test_load_codex_oauth_credentials_reads_codex_auth_json(tmp_path: Path) -> None:
+ auth_file = tmp_path / "auth.json"
+ auth_file.write_text(
+ json.dumps(
+ {
+ "auth_mode": "chatgpt",
+ "tokens": {
+ "access_token": "access-token",
+ "account_id": "account-id",
+ },
+ }
+ ),
+ encoding="utf-8",
+ )
+
+ credentials = load_codex_oauth_credentials(tmp_path)
+
+ assert credentials.access_token == "access-token" # noqa: S105
+ assert credentials.account_id == "account-id"
+
+
+def test_load_codex_oauth_credentials_errors_when_not_logged_in(tmp_path: Path) -> None:
+ with pytest.raises(CodexOAuthError, match="Codex auth file not found"):
+ load_codex_oauth_credentials(tmp_path)
+
+
+def test_refresh_codex_oauth_credentials_persists_new_tokens(tmp_path: Path) -> None:
+ auth_file = tmp_path / "auth.json"
+ auth_file.write_text(
+ json.dumps(
+ {
+ "auth_mode": "chatgpt",
+ "tokens": {
+ "id_token": "old-id",
+ "access_token": "old-access",
+ "refresh_token": "old-refresh",
+ "account_id": "account-id",
+ },
+ }
+ ),
+ encoding="utf-8",
+ )
+
+ class FakeResponse:
+ status_code = 200
+ ok = True
+ text = ""
+
+ def json(self) -> dict[str, str]:
+ return {
+ "id_token": "new-id",
+ "access_token": "new-access",
+ "refresh_token": "new-refresh",
+ }
+
+ calls = []
+
+ def fake_post(url: str, **kwargs):
+ calls.append((url, kwargs))
+ return FakeResponse()
+
+ credentials = refresh_codex_oauth_credentials(tmp_path, post=fake_post)
+
+ updated = json.loads(auth_file.read_text(encoding="utf-8"))
+ assert credentials.access_token == "new-access" # noqa: S105
+ assert updated["tokens"]["id_token"] == "new-id" # noqa: S105
+ assert updated["tokens"]["access_token"] == "new-access" # noqa: S105
+ assert updated["tokens"]["refresh_token"] == "new-refresh" # noqa: S105
+ assert updated["tokens"]["account_id"] == "account-id"
+ assert calls[0][0] == "https://auth.openai.com/oauth/token"
+ assert calls[0][1]["json"]["grant_type"] == "refresh_token"
+
+
+def test_post_codex_responses_uses_user_agent_header(
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ calls = []
+
+ class FakeResponse:
+ pass
+
+ def fake_post(url: str, **kwargs):
+ calls.append((url, kwargs))
+ return FakeResponse()
+
+ monkeypatch.setattr("strix.llm.codex_oauth.requests.post", fake_post)
+
+ response = _post_codex_responses(
+ "https://example.test/codex",
+ CodexOAuthCredentials(access_token="access-token", account_id="account-id"),
+ {"model": "gpt-5.5"},
+ 60,
+ )
+
+ assert isinstance(response, FakeResponse)
+ headers = calls[0][1]["headers"]
+ assert headers["User-Agent"] == "strix-codex-oauth"
+ assert "version" not in headers
+
+
+def test_build_codex_responses_payload_converts_chat_messages() -> None:
+ payload = build_codex_responses_payload(
+ model="gpt-5.5",
+ messages=[
+ {"role": "system", "content": "System prompt."},
+ {"role": "user", "content": "Find bugs."},
+ {"role": "assistant", "content": "I will inspect the code."},
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": "Tool Results:"},
+ {"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
+ ],
+ },
+ ],
+ reasoning_effort="high",
+ )
+
+ assert payload["model"] == "gpt-5.5"
+ assert payload["instructions"] == "System prompt."
+ assert payload["stream"] is True
+ assert payload["reasoning"] == {"effort": "high"}
+ assert payload["input"] == [
+ {
+ "type": "message",
+ "role": "user",
+ "content": [{"type": "input_text", "text": "Find bugs."}],
+ },
+ {
+ "type": "message",
+ "role": "assistant",
+ "content": [{"type": "output_text", "text": "I will inspect the code."}],
+ },
+ {
+ "type": "message",
+ "role": "user",
+ "content": [
+ {"type": "input_text", "text": "Tool Results:"},
+ {"type": "input_image", "image_url": "data:image/png;base64,abc"},
+ ],
+ },
+ ]
+
+
+def test_parse_responses_sse_events_extracts_text_deltas_and_usage() -> None:
+ content, usage = parse_responses_sse_events(
+ [
+ 'data: {"type":"response.output_text.delta","delta":"hel"}',
+ 'data: {"type":"response.output_text.delta","delta":"lo"}',
+ (
+ 'data: {"type":"response.completed","response":'
+ '{"usage":{"input_tokens":3,"output_tokens":2}}}'
+ ),
+ "data: [DONE]",
+ ]
+ )
+
+ assert content == "hello"
+ assert usage == {"input_tokens": 3, "output_tokens": 2}