From 7e21c87a8869a249ef3bb0b58e21a90a3f99469d Mon Sep 17 00:00:00 2001 From: monkut Date: Thu, 30 Apr 2026 20:43:17 +0900 Subject: [PATCH 1/2] :sparkles: Add Claude OAuth token discovery chain (closes #102) Resolve CLAUDE_CODE_OAUTH_TOKEN via an explicit discovery chain in ClaudeRunner.run() instead of relying solely on the parent process env. Stops at the first non-empty source and logs which source won, so a 401 from the spawned `claude` subprocess is replaced with a clear "no Claude credentials found in any of: ..." error before the subprocess is invoked. Sources, in order: 1. CLAUDE_CODE_OAUTH_TOKEN env (no log when used) 2. CLAUDE_OAUTH_TOKEN_FILE env -> file path 3. ~/.tokens/.claude-oauth-token (conventional headless) 4. ${XDG_CONFIG_HOME:-~/.config}/claude/oauth-token 5. ~/.claude/.credentials.json (logs WARNING about staleness) PermissionError on token files and JSON parse/schema errors on credentials.json log a WARNING and continue. File contents are .strip()-ed. --- README.md | 25 ++++ askcc/cli.py | 26 ++-- askcc/runners.py | 120 ++++++++++++++++- tests/test_askcc.py | 311 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 468 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 29a8da1..57ef31c 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,31 @@ Supporting commands can be used at any point: | `ASKCC_LANGUAGE` | Default output language for agent comments (`english`, `japanese`) | `english` | | `DECISION_ISSUE_LABEL` | GitHub label applied when an agent flags a decision is needed | `needs:decision` | | `ENABLE_ISSUE_LABEL_PREFIX_VALIDATION` | Enable/disable issue label prefix validation before agent execution | `true` | +| `CLAUDE_CODE_OAUTH_TOKEN` | Claude Code OAuth token used by the spawned `claude` subprocess | (none) | +| `CLAUDE_OAUTH_TOKEN_FILE` | Path to a file containing the OAuth token. Read when `CLAUDE_CODE_OAUTH_TOKEN` is unset/empty | (none) | + +### Authentication + +Before invoking the `claude` subprocess, askcc resolves the OAuth token via the +following discovery chain. The first source that produces a non-empty token wins; +the chosen source is logged at `INFO` level so failures are diagnosable from the +run log alone. + +1. `CLAUDE_CODE_OAUTH_TOKEN` environment variable (no log when used). +2. `CLAUDE_OAUTH_TOKEN_FILE` environment variable — read the file at that path. +3. `~/.tokens/.claude-oauth-token` — conventional headless token file used by + `~/.bashrc` to populate the env var for interactive shells. +4. `${XDG_CONFIG_HOME:-~/.config}/claude/oauth-token` — XDG-compliant fallback. +5. `~/.claude/.credentials.json` — last-resort parse of `claudeAiOauth.accessToken`. + A `WARNING` is logged when this source is used because Claude Code refreshes + its access token in RAM and may not write back, so the file can be stale. + +If none of the sources produce a non-empty token, askcc exits non-zero with an +error listing every location that was checked, **without** invoking `claude`. + +Whitespace around file contents is stripped. Unreadable token files +(`PermissionError`) and malformed `.credentials.json` files emit a `WARNING` and +the chain continues to the next source. ### User Configuration File diff --git a/askcc/cli.py b/askcc/cli.py index 32b1ef9..ed4e07f 100644 --- a/askcc/cli.py +++ b/askcc/cli.py @@ -26,7 +26,7 @@ validate_issue_readiness, write_prompt_content, ) -from .runners import DEFAULT_RUNNER, RUNNER_REGISTRY, get_runner +from .runners import DEFAULT_RUNNER, RUNNER_REGISTRY, OAuthTokenNotFoundError, get_runner from .settings import VALID_EFFORT_LEVELS, SupportedLanguage, configure_logging logger = logging.getLogger(__name__) @@ -286,16 +286,20 @@ def main() -> None: # noqa: PLR0912, PLR0915, C901 effort_level = _resolve_effort(args.effort, config.effort) max_thinking_tokens = _resolve_max_thinking_tokens(args.max_thinking_tokens, config.max_thinking_tokens) try: - return_code, usage = runner.run( - prompt, - config=config, - issue_url=issue_url, - cwd=cwd, - effort_level=effort_level, - max_thinking_tokens=max_thinking_tokens, - disable_thinking=args.disable_thinking, - disable_adaptive_thinking=args.disable_adaptive_thinking, - ) + try: + return_code, usage = runner.run( + prompt, + config=config, + issue_url=issue_url, + cwd=cwd, + effort_level=effort_level, + max_thinking_tokens=max_thinking_tokens, + disable_thinking=args.disable_thinking, + disable_adaptive_thinking=args.disable_adaptive_thinking, + ) + except OAuthTokenNotFoundError as e: + logger.error("[%s] %s", issue_url, e) # noqa: TRY400 + sys.exit(1) finally: # Clean up /tmp files created by _build_prompt (not user templates) for f in prompt_tempfiles: diff --git a/askcc/runners.py b/askcc/runners.py index d435cb8..d95660c 100644 --- a/askcc/runners.py +++ b/askcc/runners.py @@ -5,19 +5,124 @@ import os import subprocess from abc import ABC, abstractmethod +from pathlib import Path from typing import TYPE_CHECKING from .settings import CLAUDE_ENV_DISABLE_ADAPTIVE_THINKING, CLAUDE_ENV_DISABLE_THINKING, CLAUDE_ENV_MAX_THINKING_TOKENS if TYPE_CHECKING: - from pathlib import Path - from .definitions import AgentConfig logger = logging.getLogger(__name__) DEBUG_OUTPUT_MAX_CHARS = 2000 +OAUTH_TOKEN_ENV = "CLAUDE_CODE_OAUTH_TOKEN" # noqa: S105 +OAUTH_TOKEN_FILE_ENV = "CLAUDE_OAUTH_TOKEN_FILE" # noqa: S105 +CONVENTIONAL_TOKEN_FILE: Path = Path.home() / ".tokens" / ".claude-oauth-token" +CREDENTIALS_JSON_FILE: Path = Path.home() / ".claude" / ".credentials.json" + + +class OAuthTokenNotFoundError(RuntimeError): + """Raised when no Claude OAuth token can be resolved from any source.""" + + +def _read_token_file(path: Path) -> str | None: + """Read and strip a token file. Returns None on FileNotFoundError or empty content. + + Logs a WARNING and returns None on PermissionError. + """ + try: + content = path.read_text() + except FileNotFoundError: + return None + except PermissionError as exc: + logger.warning("auth: cannot read token file %s: %s", path, exc) + return None + token = content.strip() + return token or None + + +def _read_credentials_json(path: Path) -> str | None: + """Read ``claudeAiOauth.accessToken`` from the Claude Code credentials file. + + Schema observed: ``{"claudeAiOauth": {"accessToken": "..."}}``. + Returns None if missing/empty; logs a WARNING and returns None on parse or schema errors. + """ + try: + content = path.read_text() + except FileNotFoundError: + return None + except PermissionError as exc: + logger.warning("auth: cannot read credentials file %s: %s", path, exc) + return None + try: + data = json.loads(content) + except json.JSONDecodeError as exc: + logger.warning("auth: failed to parse %s as JSON: %s", path, exc) + return None + try: + token = data["claudeAiOauth"]["accessToken"] + except (KeyError, TypeError) as exc: + logger.warning("auth: %s missing claudeAiOauth.accessToken: %s", path, exc) + return None + if not isinstance(token, str): + logger.warning("auth: %s claudeAiOauth.accessToken is not a string", path) + return None + token = token.strip() + return token or None + + +def _xdg_token_path() -> Path: + """Compute the XDG-compliant Claude OAuth token path at call time.""" + xdg_config_home = os.environ.get("XDG_CONFIG_HOME", "").strip() + base = Path(xdg_config_home) if xdg_config_home else Path.home() / ".config" + return base / "claude" / "oauth-token" + + +def _resolve_oauth_token() -> tuple[str, str]: + """Resolve a Claude OAuth token from the discovery chain. + + Returns ``(token, source_label)`` where ``source_label`` names the source that + produced the token. Raises :class:`OAuthTokenNotFoundError` when no source + yields a non-empty token. + """ + env_token = os.environ.get(OAUTH_TOKEN_ENV, "").strip() + if env_token: + return env_token, f"env {OAUTH_TOKEN_ENV}" + + checked: list[str] = [f"env {OAUTH_TOKEN_ENV}"] + + custom_path_str = os.environ.get(OAUTH_TOKEN_FILE_ENV, "").strip() + if custom_path_str: + custom_path = Path(custom_path_str).expanduser() + checked.append(f"env {OAUTH_TOKEN_FILE_ENV}={custom_path}") + token = _read_token_file(custom_path) + if token: + return token, f"file {custom_path} (via {OAUTH_TOKEN_FILE_ENV})" + else: + checked.append(f"env {OAUTH_TOKEN_FILE_ENV} (unset)") + + checked.append(f"file {CONVENTIONAL_TOKEN_FILE}") + conventional = _read_token_file(CONVENTIONAL_TOKEN_FILE) + if conventional: + return conventional, f"file {CONVENTIONAL_TOKEN_FILE}" + + xdg_path = _xdg_token_path() + checked.append(f"file {xdg_path}") + xdg_token = _read_token_file(xdg_path) + if xdg_token: + return xdg_token, f"file {xdg_path}" + + checked.append(f"file {CREDENTIALS_JSON_FILE}") + credentials_token = _read_credentials_json(CREDENTIALS_JSON_FILE) + if credentials_token: + return credentials_token, f"file {CREDENTIALS_JSON_FILE}" + + locations = ", ".join(checked) + msg = f"no Claude credentials found in any of: {locations}" + raise OAuthTokenNotFoundError(msg) + class Runner(ABC): """Base class for task runners.""" @@ -88,6 +193,17 @@ def run( env = os.environ.copy() env.pop("CLAUDECODE", None) + token, source = _resolve_oauth_token() + env[OAUTH_TOKEN_ENV] = token + if source != f"env {OAUTH_TOKEN_ENV}": + logger.info("[%s] auth: loaded %s from %s", issue_url, OAUTH_TOKEN_ENV, source) + if str(CREDENTIALS_JSON_FILE) in source: + logger.warning( + "[%s] auth: %s can be stale (Claude Code refreshes in RAM and may not write back)", + issue_url, + CREDENTIALS_JSON_FILE, + ) + if max_thinking_tokens is not None: env[CLAUDE_ENV_MAX_THINKING_TOKENS] = str(max_thinking_tokens) if disable_thinking: diff --git a/tests/test_askcc.py b/tests/test_askcc.py index 274d5e8..2e3c612 100644 --- a/tests/test_askcc.py +++ b/tests/test_askcc.py @@ -45,7 +45,16 @@ validate_template, write_prompt_content, ) -from askcc.runners import ClaudeRunner, get_runner +from askcc.runners import ( + OAUTH_TOKEN_ENV, + OAUTH_TOKEN_FILE_ENV, + ClaudeRunner, + OAuthTokenNotFoundError, + _read_credentials_json, + _read_token_file, + _resolve_oauth_token, + get_runner, +) from askcc.settings import ( ASKCC_HOME, CLAUDE_ENV_DISABLE_ADAPTIVE_THINKING, @@ -563,6 +572,10 @@ def test_file_path_substitution(self): class TestClaudeRunner: ISSUE_URL = "https://github.com/test/repo/issues/1" + @pytest.fixture(autouse=True) + def _set_oauth_token(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(OAUTH_TOKEN_ENV, "test-oauth-token") # noqa: S105 + @pytest.fixture def agent_config(self) -> AgentConfig: return AgentConfig( @@ -2039,6 +2052,10 @@ class TestClaudeRunnerThinkingOptions: ISSUE_URL = "https://github.com/test/repo/issues/1" + @pytest.fixture(autouse=True) + def _set_oauth_token(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(OAUTH_TOKEN_ENV, "test-oauth-token") # noqa: S105 + @pytest.fixture def agent_config(self) -> AgentConfig: return AgentConfig( @@ -2155,6 +2172,294 @@ def test_disable_thinking_false_not_set(self, runner: ClaudeRunner, agent_config assert CLAUDE_ENV_DISABLE_THINKING not in env +class TestResolveOAuthToken: + """Tests for the OAuth token discovery chain.""" + + @pytest.fixture(autouse=True) + def _isolate_env(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + monkeypatch.delenv(OAUTH_TOKEN_ENV, raising=False) + monkeypatch.delenv(OAUTH_TOKEN_FILE_ENV, raising=False) + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "xdg-empty")) + monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", tmp_path / "missing-conventional") + monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", tmp_path / "missing-credentials.json") + + def test_env_var_present_no_fallback_used(self, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv(OAUTH_TOKEN_ENV, "env-token") # noqa: S105 + with patch("askcc.runners._read_token_file") as mock_read: + token, source = _resolve_oauth_token() + assert token == "env-token" # noqa: S105 + assert source == f"env {OAUTH_TOKEN_ENV}" + mock_read.assert_not_called() + + def test_env_var_empty_falls_through(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + monkeypatch.setenv(OAUTH_TOKEN_ENV, " ") + token_file = tmp_path / "token" + token_file.write_text("file-token") + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) + token, source = _resolve_oauth_token() + assert token == "file-token" # noqa: S105 + assert OAUTH_TOKEN_FILE_ENV in source + + def test_token_file_env_var_used_when_main_env_missing(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + token_file = tmp_path / "custom-token" + token_file.write_text("custom-token-value") + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) + token, source = _resolve_oauth_token() + assert token == "custom-token-value" # noqa: S105 + assert str(token_file) in source + assert OAUTH_TOKEN_FILE_ENV in source + + def test_conventional_path_used_when_envs_missing(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + conventional = tmp_path / "conventional" + conventional.write_text("conventional-token") + monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", conventional) + token, source = _resolve_oauth_token() + assert token == "conventional-token" # noqa: S105 + assert str(conventional) in source + + def test_xdg_path_used(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + xdg_dir = tmp_path / "xdg" + (xdg_dir / "claude").mkdir(parents=True) + (xdg_dir / "claude" / "oauth-token").write_text("xdg-token") + monkeypatch.setenv("XDG_CONFIG_HOME", str(xdg_dir)) + token, source = _resolve_oauth_token() + assert token == "xdg-token" # noqa: S105 + assert "oauth-token" in source + + def test_xdg_path_default_when_xdg_unset(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + fake_home = tmp_path / "home" + (fake_home / ".config" / "claude").mkdir(parents=True) + (fake_home / ".config" / "claude" / "oauth-token").write_text("home-xdg-token") + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + monkeypatch.setattr("askcc.runners.Path.home", lambda: fake_home) + token, source = _resolve_oauth_token() + assert token == "home-xdg-token" # noqa: S105 + assert ".config/claude/oauth-token" in source + + def test_credentials_json_used_with_warning( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ): + creds = tmp_path / "credentials.json" + creds.write_text(json.dumps({"claudeAiOauth": {"accessToken": "creds-token"}})) + monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) + with caplog.at_level("INFO", logger="askcc.runners"): + token, source = _resolve_oauth_token() + assert token == "creds-token" # noqa: S105 + assert str(creds) in source + + def test_trailing_newline_stripped(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + token_file = tmp_path / "token" + token_file.write_text("abc\n") + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) + token, _ = _resolve_oauth_token() + assert token == "abc" # noqa: S105 + + def test_unreadable_file_warns_and_continues( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ): + unreadable = tmp_path / "unreadable" + unreadable.write_text("ignored") + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(unreadable)) + + conventional = tmp_path / "conventional" + conventional.write_text("from-conventional") + monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", conventional) + + original_read_text = Path.read_text + + def fake_read_text(self: Path, *args, **kwargs) -> str: + if self == unreadable: + raise PermissionError("denied") + return original_read_text(self, *args, **kwargs) + + monkeypatch.setattr(Path, "read_text", fake_read_text) + with caplog.at_level("WARNING", logger="askcc.runners"): + token, source = _resolve_oauth_token() + assert token == "from-conventional" # noqa: S105 + assert str(conventional) in source + assert "cannot read token file" in caplog.text + assert str(unreadable) in caplog.text + + def test_malformed_credentials_json_warns_and_continues( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ): + creds = tmp_path / "credentials.json" + creds.write_text("{not json") + monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) + with caplog.at_level("WARNING", logger="askcc.runners"), pytest.raises(OAuthTokenNotFoundError): + _resolve_oauth_token() + assert "failed to parse" in caplog.text + assert str(creds) in caplog.text + + def test_credentials_json_missing_keys_warns_and_continues( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ): + creds = tmp_path / "credentials.json" + creds.write_text(json.dumps({"otherSchema": {"foo": "bar"}})) + monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) + with caplog.at_level("WARNING", logger="askcc.runners"), pytest.raises(OAuthTokenNotFoundError): + _resolve_oauth_token() + assert "claudeAiOauth.accessToken" in caplog.text + + def test_empty_token_file_falls_through( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + ): + empty_file = tmp_path / "empty" + empty_file.write_text(" \n \n") + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(empty_file)) + with pytest.raises(OAuthTokenNotFoundError): + _resolve_oauth_token() + + def test_all_sources_empty_raises_with_paths(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(tmp_path / "missing-custom")) + monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", tmp_path / "missing-conventional") + monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", tmp_path / "missing-credentials.json") + with pytest.raises(OAuthTokenNotFoundError) as exc_info: + _resolve_oauth_token() + message = str(exc_info.value) + assert OAUTH_TOKEN_ENV in message + assert OAUTH_TOKEN_FILE_ENV in message + assert "missing-custom" in message + assert "missing-conventional" in message + assert "missing-credentials.json" in message + + def test_read_credentials_json_helper_returns_none_when_missing(self, tmp_path: Path): + result = _read_credentials_json(tmp_path / "does-not-exist") + assert result is None + + def test_read_token_file_helper_returns_none_when_missing(self, tmp_path: Path): + result = _read_token_file(tmp_path / "does-not-exist") + assert result is None + + +class TestClaudeRunnerOAuthIntegration: + """Tests that ClaudeRunner injects the resolved OAuth token into the subprocess env.""" + + ISSUE_URL = "https://github.com/test/repo/issues/1" + + @pytest.fixture + def agent_config(self) -> AgentConfig: + return AgentConfig( + action_name="test-agent", + description="A test agent", + system_prompt="You are a test agent.", + user_prompt_template="$issue_content", + system_prompt_file="TEST_SYSTEM_PROMPT.md", + user_prompt_file="TEST_USER_PROMPT.md", + ) + + @pytest.fixture + def runner(self) -> ClaudeRunner: + return ClaudeRunner() + + def test_runner_injects_resolved_token_into_subprocess_env(self, runner: ClaudeRunner, agent_config: AgentConfig): + mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="{}", stderr="") + with ( + patch( + "askcc.runners._resolve_oauth_token", + return_value=("xyz", f"file /tmp/example-{OAUTH_TOKEN_ENV}"), + ), + patch("askcc.runners.subprocess.run", return_value=mock_result) as mock_run, + ): + runner.run("prompt", config=agent_config, issue_url=self.ISSUE_URL, cwd=Path.cwd()) + env = mock_run.call_args[1]["env"] + assert env[OAUTH_TOKEN_ENV] == "xyz" + + def test_runner_logs_source_for_non_env_resolution( + self, + runner: ClaudeRunner, + agent_config: AgentConfig, + caplog: pytest.LogCaptureFixture, + ): + mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="{}", stderr="") + with ( + caplog.at_level("INFO", logger="askcc.runners"), + patch( + "askcc.runners._resolve_oauth_token", + return_value=("token", "file /home/user/.tokens/.claude-oauth-token"), + ), + patch("askcc.runners.subprocess.run", return_value=mock_result), + ): + runner.run("prompt", config=agent_config, issue_url=self.ISSUE_URL, cwd=Path.cwd()) + assert "auth: loaded" in caplog.text + assert ".claude-oauth-token" in caplog.text + + def test_runner_no_log_when_env_var_source( + self, + runner: ClaudeRunner, + agent_config: AgentConfig, + caplog: pytest.LogCaptureFixture, + ): + mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="{}", stderr="") + with ( + caplog.at_level("INFO", logger="askcc.runners"), + patch( + "askcc.runners._resolve_oauth_token", + return_value=("token", f"env {OAUTH_TOKEN_ENV}"), + ), + patch("askcc.runners.subprocess.run", return_value=mock_result), + ): + runner.run("prompt", config=agent_config, issue_url=self.ISSUE_URL, cwd=Path.cwd()) + assert "auth: loaded" not in caplog.text + + def test_runner_logs_warning_when_credentials_json_used( + self, + runner: ClaudeRunner, + agent_config: AgentConfig, + caplog: pytest.LogCaptureFixture, + ): + mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="{}", stderr="") + creds_path = Path.home() / ".claude" / ".credentials.json" + with ( + caplog.at_level("WARNING", logger="askcc.runners"), + patch( + "askcc.runners._resolve_oauth_token", + return_value=("token", f"file {creds_path}"), + ), + patch("askcc.runners.subprocess.run", return_value=mock_result), + ): + runner.run("prompt", config=agent_config, issue_url=self.ISSUE_URL, cwd=Path.cwd()) + assert "can be stale" in caplog.text + + +class TestMainOAuthExitPath: + """Tests CLI exit behavior when OAuth token resolution fails.""" + + ISSUE_URL = "https://github.com/monkut/askcc-cli/issues/1" + + def test_main_exits_nonzero_when_oauth_resolution_fails(self, caplog: pytest.LogCaptureFixture): + failing_runner = MagicMock() + failing_runner.run.side_effect = OAuthTokenNotFoundError( + "no Claude credentials found in any of: env CLAUDE_CODE_OAUTH_TOKEN, ..." + ) + with ( + caplog.at_level("ERROR", logger="askcc.cli"), + patch("askcc.cli.bootstrap_templates"), + patch("askcc.cli.validate_issue_labels", return_value=[]), + patch("askcc.cli.fetch_github_issue", return_value="issue body"), + patch("askcc.cli.get_runner", return_value=failing_runner), + patch("sys.argv", ["askcc", "plan", "-g", self.ISSUE_URL]), + pytest.raises(SystemExit) as exc_info, + ): + main() + assert exc_info.value.code == 1 + assert "no Claude credentials found" in caplog.text + + class TestParseFrontmatter: """Tests for the YAML-style subagent frontmatter parser.""" @@ -2334,6 +2639,10 @@ class TestRunnerFrontmatterFlags: ISSUE_URL = "https://github.com/test/repo/issues/1" + @pytest.fixture(autouse=True) + def _set_oauth_token(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(OAUTH_TOKEN_ENV, "test-oauth-token") # noqa: S105 + def _config(self, **overrides) -> AgentConfig: base = { "action_name": "test", From 9c638ac52503bb5ec402d023bae29105e1db452e Mon Sep 17 00:00:00 2001 From: monkut Date: Fri, 1 May 2026 07:48:01 +0900 Subject: [PATCH 2/2] :art: Move OAuth helpers to ClaudeRunner methods Refactor _read_token_file, _read_credentials_json, _xdg_token_path, and _resolve_oauth_token from module-level functions to ClaudeRunner instance methods, per PR review feedback. Tests updated to use a ClaudeRunner fixture and patch.object for method-level isolation. --- askcc/runners.py | 192 ++++++++++++++++++++++---------------------- tests/test_askcc.py | 80 ++++++++++-------- 2 files changed, 141 insertions(+), 131 deletions(-) diff --git a/askcc/runners.py b/askcc/runners.py index d95660c..124164d 100644 --- a/askcc/runners.py +++ b/askcc/runners.py @@ -27,103 +27,6 @@ class OAuthTokenNotFoundError(RuntimeError): """Raised when no Claude OAuth token can be resolved from any source.""" -def _read_token_file(path: Path) -> str | None: - """Read and strip a token file. Returns None on FileNotFoundError or empty content. - - Logs a WARNING and returns None on PermissionError. - """ - try: - content = path.read_text() - except FileNotFoundError: - return None - except PermissionError as exc: - logger.warning("auth: cannot read token file %s: %s", path, exc) - return None - token = content.strip() - return token or None - - -def _read_credentials_json(path: Path) -> str | None: - """Read ``claudeAiOauth.accessToken`` from the Claude Code credentials file. - - Schema observed: ``{"claudeAiOauth": {"accessToken": "..."}}``. - Returns None if missing/empty; logs a WARNING and returns None on parse or schema errors. - """ - try: - content = path.read_text() - except FileNotFoundError: - return None - except PermissionError as exc: - logger.warning("auth: cannot read credentials file %s: %s", path, exc) - return None - try: - data = json.loads(content) - except json.JSONDecodeError as exc: - logger.warning("auth: failed to parse %s as JSON: %s", path, exc) - return None - try: - token = data["claudeAiOauth"]["accessToken"] - except (KeyError, TypeError) as exc: - logger.warning("auth: %s missing claudeAiOauth.accessToken: %s", path, exc) - return None - if not isinstance(token, str): - logger.warning("auth: %s claudeAiOauth.accessToken is not a string", path) - return None - token = token.strip() - return token or None - - -def _xdg_token_path() -> Path: - """Compute the XDG-compliant Claude OAuth token path at call time.""" - xdg_config_home = os.environ.get("XDG_CONFIG_HOME", "").strip() - base = Path(xdg_config_home) if xdg_config_home else Path.home() / ".config" - return base / "claude" / "oauth-token" - - -def _resolve_oauth_token() -> tuple[str, str]: - """Resolve a Claude OAuth token from the discovery chain. - - Returns ``(token, source_label)`` where ``source_label`` names the source that - produced the token. Raises :class:`OAuthTokenNotFoundError` when no source - yields a non-empty token. - """ - env_token = os.environ.get(OAUTH_TOKEN_ENV, "").strip() - if env_token: - return env_token, f"env {OAUTH_TOKEN_ENV}" - - checked: list[str] = [f"env {OAUTH_TOKEN_ENV}"] - - custom_path_str = os.environ.get(OAUTH_TOKEN_FILE_ENV, "").strip() - if custom_path_str: - custom_path = Path(custom_path_str).expanduser() - checked.append(f"env {OAUTH_TOKEN_FILE_ENV}={custom_path}") - token = _read_token_file(custom_path) - if token: - return token, f"file {custom_path} (via {OAUTH_TOKEN_FILE_ENV})" - else: - checked.append(f"env {OAUTH_TOKEN_FILE_ENV} (unset)") - - checked.append(f"file {CONVENTIONAL_TOKEN_FILE}") - conventional = _read_token_file(CONVENTIONAL_TOKEN_FILE) - if conventional: - return conventional, f"file {CONVENTIONAL_TOKEN_FILE}" - - xdg_path = _xdg_token_path() - checked.append(f"file {xdg_path}") - xdg_token = _read_token_file(xdg_path) - if xdg_token: - return xdg_token, f"file {xdg_path}" - - checked.append(f"file {CREDENTIALS_JSON_FILE}") - credentials_token = _read_credentials_json(CREDENTIALS_JSON_FILE) - if credentials_token: - return credentials_token, f"file {CREDENTIALS_JSON_FILE}" - - locations = ", ".join(checked) - msg = f"no Claude credentials found in any of: {locations}" - raise OAuthTokenNotFoundError(msg) - - class Runner(ABC): """Base class for task runners.""" @@ -160,6 +63,99 @@ def _frontmatter_cli_flags(config: AgentConfig) -> list[str]: class ClaudeRunner(Runner): """Runs tasks via the Claude Code CLI.""" + def _read_token_file(self, path: Path) -> str | None: + """Read and strip a token file. Returns None on FileNotFoundError or empty content. + + Logs a WARNING and returns None on PermissionError. + """ + try: + content = path.read_text() + except FileNotFoundError: + return None + except PermissionError as exc: + logger.warning("auth: cannot read token file %s: %s", path, exc) + return None + token = content.strip() + return token or None + + def _read_credentials_json(self, path: Path) -> str | None: + """Read ``claudeAiOauth.accessToken`` from the Claude Code credentials file. + + Schema observed: ``{"claudeAiOauth": {"accessToken": "..."}}``. + Returns None if missing/empty; logs a WARNING and returns None on parse or schema errors. + """ + try: + content = path.read_text() + except FileNotFoundError: + return None + except PermissionError as exc: + logger.warning("auth: cannot read credentials file %s: %s", path, exc) + return None + try: + data = json.loads(content) + except json.JSONDecodeError as exc: + logger.warning("auth: failed to parse %s as JSON: %s", path, exc) + return None + try: + token = data["claudeAiOauth"]["accessToken"] + except (KeyError, TypeError) as exc: + logger.warning("auth: %s missing claudeAiOauth.accessToken: %s", path, exc) + return None + if not isinstance(token, str): + logger.warning("auth: %s claudeAiOauth.accessToken is not a string", path) + return None + token = token.strip() + return token or None + + def _xdg_token_path(self) -> Path: + """Compute the XDG-compliant Claude OAuth token path at call time.""" + xdg_config_home = os.environ.get("XDG_CONFIG_HOME", "").strip() + base = Path(xdg_config_home) if xdg_config_home else Path.home() / ".config" + return base / "claude" / "oauth-token" + + def _resolve_oauth_token(self) -> tuple[str, str]: + """Resolve a Claude OAuth token from the discovery chain. + + Returns ``(token, source_label)`` where ``source_label`` names the source that + produced the token. Raises :class:`OAuthTokenNotFoundError` when no source + yields a non-empty token. + """ + env_token = os.environ.get(OAUTH_TOKEN_ENV, "").strip() + if env_token: + return env_token, f"env {OAUTH_TOKEN_ENV}" + + checked: list[str] = [f"env {OAUTH_TOKEN_ENV}"] + + custom_path_str = os.environ.get(OAUTH_TOKEN_FILE_ENV, "").strip() + if custom_path_str: + custom_path = Path(custom_path_str).expanduser() + checked.append(f"env {OAUTH_TOKEN_FILE_ENV}={custom_path}") + token = self._read_token_file(custom_path) + if token: + return token, f"file {custom_path} (via {OAUTH_TOKEN_FILE_ENV})" + else: + checked.append(f"env {OAUTH_TOKEN_FILE_ENV} (unset)") + + checked.append(f"file {CONVENTIONAL_TOKEN_FILE}") + conventional = self._read_token_file(CONVENTIONAL_TOKEN_FILE) + if conventional: + return conventional, f"file {CONVENTIONAL_TOKEN_FILE}" + + xdg_path = self._xdg_token_path() + checked.append(f"file {xdg_path}") + xdg_token = self._read_token_file(xdg_path) + if xdg_token: + return xdg_token, f"file {xdg_path}" + + checked.append(f"file {CREDENTIALS_JSON_FILE}") + credentials_token = self._read_credentials_json(CREDENTIALS_JSON_FILE) + if credentials_token: + return credentials_token, f"file {CREDENTIALS_JSON_FILE}" + + locations = ", ".join(checked) + msg = f"no Claude credentials found in any of: {locations}" + raise OAuthTokenNotFoundError(msg) + def run( self, prompt: str, @@ -193,7 +189,7 @@ def run( env = os.environ.copy() env.pop("CLAUDECODE", None) - token, source = _resolve_oauth_token() + token, source = self._resolve_oauth_token() env[OAUTH_TOKEN_ENV] = token if source != f"env {OAUTH_TOKEN_ENV}": logger.info("[%s] auth: loaded %s from %s", issue_url, OAUTH_TOKEN_ENV, source) diff --git a/tests/test_askcc.py b/tests/test_askcc.py index 2e3c612..398b26f 100644 --- a/tests/test_askcc.py +++ b/tests/test_askcc.py @@ -50,9 +50,6 @@ OAUTH_TOKEN_FILE_ENV, ClaudeRunner, OAuthTokenNotFoundError, - _read_credentials_json, - _read_token_file, - _resolve_oauth_token, get_runner, ) from askcc.settings import ( @@ -2175,6 +2172,10 @@ def test_disable_thinking_false_not_set(self, runner: ClaudeRunner, agent_config class TestResolveOAuthToken: """Tests for the OAuth token discovery chain.""" + @pytest.fixture + def runner(self) -> ClaudeRunner: + return ClaudeRunner() + @pytest.fixture(autouse=True) def _isolate_env(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: monkeypatch.delenv(OAUTH_TOKEN_ENV, raising=False) @@ -2183,61 +2184,68 @@ def _isolate_env(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", tmp_path / "missing-conventional") monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", tmp_path / "missing-credentials.json") - def test_env_var_present_no_fallback_used(self, monkeypatch: pytest.MonkeyPatch): + def test_env_var_present_no_fallback_used(self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv(OAUTH_TOKEN_ENV, "env-token") # noqa: S105 - with patch("askcc.runners._read_token_file") as mock_read: - token, source = _resolve_oauth_token() + with patch.object(runner, "_read_token_file") as mock_read: + token, source = runner._resolve_oauth_token() assert token == "env-token" # noqa: S105 assert source == f"env {OAUTH_TOKEN_ENV}" mock_read.assert_not_called() - def test_env_var_empty_falls_through(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_env_var_empty_falls_through(self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): monkeypatch.setenv(OAUTH_TOKEN_ENV, " ") token_file = tmp_path / "token" token_file.write_text("file-token") monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "file-token" # noqa: S105 assert OAUTH_TOKEN_FILE_ENV in source - def test_token_file_env_var_used_when_main_env_missing(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_token_file_env_var_used_when_main_env_missing( + self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): token_file = tmp_path / "custom-token" token_file.write_text("custom-token-value") monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "custom-token-value" # noqa: S105 assert str(token_file) in source assert OAUTH_TOKEN_FILE_ENV in source - def test_conventional_path_used_when_envs_missing(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_conventional_path_used_when_envs_missing( + self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): conventional = tmp_path / "conventional" conventional.write_text("conventional-token") monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", conventional) - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "conventional-token" # noqa: S105 assert str(conventional) in source - def test_xdg_path_used(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_xdg_path_used(self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): xdg_dir = tmp_path / "xdg" (xdg_dir / "claude").mkdir(parents=True) (xdg_dir / "claude" / "oauth-token").write_text("xdg-token") monkeypatch.setenv("XDG_CONFIG_HOME", str(xdg_dir)) - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "xdg-token" # noqa: S105 assert "oauth-token" in source - def test_xdg_path_default_when_xdg_unset(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_xdg_path_default_when_xdg_unset( + self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): fake_home = tmp_path / "home" (fake_home / ".config" / "claude").mkdir(parents=True) (fake_home / ".config" / "claude" / "oauth-token").write_text("home-xdg-token") monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) monkeypatch.setattr("askcc.runners.Path.home", lambda: fake_home) - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "home-xdg-token" # noqa: S105 assert ".config/claude/oauth-token" in source def test_credentials_json_used_with_warning( self, + runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path, caplog: pytest.LogCaptureFixture, @@ -2246,19 +2254,20 @@ def test_credentials_json_used_with_warning( creds.write_text(json.dumps({"claudeAiOauth": {"accessToken": "creds-token"}})) monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) with caplog.at_level("INFO", logger="askcc.runners"): - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "creds-token" # noqa: S105 assert str(creds) in source - def test_trailing_newline_stripped(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_trailing_newline_stripped(self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): token_file = tmp_path / "token" token_file.write_text("abc\n") monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(token_file)) - token, _ = _resolve_oauth_token() + token, _ = runner._resolve_oauth_token() assert token == "abc" # noqa: S105 def test_unreadable_file_warns_and_continues( self, + runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path, caplog: pytest.LogCaptureFixture, @@ -2280,7 +2289,7 @@ def fake_read_text(self: Path, *args, **kwargs) -> str: monkeypatch.setattr(Path, "read_text", fake_read_text) with caplog.at_level("WARNING", logger="askcc.runners"): - token, source = _resolve_oauth_token() + token, source = runner._resolve_oauth_token() assert token == "from-conventional" # noqa: S105 assert str(conventional) in source assert "cannot read token file" in caplog.text @@ -2288,6 +2297,7 @@ def fake_read_text(self: Path, *args, **kwargs) -> str: def test_malformed_credentials_json_warns_and_continues( self, + runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path, caplog: pytest.LogCaptureFixture, @@ -2296,12 +2306,13 @@ def test_malformed_credentials_json_warns_and_continues( creds.write_text("{not json") monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) with caplog.at_level("WARNING", logger="askcc.runners"), pytest.raises(OAuthTokenNotFoundError): - _resolve_oauth_token() + runner._resolve_oauth_token() assert "failed to parse" in caplog.text assert str(creds) in caplog.text def test_credentials_json_missing_keys_warns_and_continues( self, + runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path, caplog: pytest.LogCaptureFixture, @@ -2310,11 +2321,12 @@ def test_credentials_json_missing_keys_warns_and_continues( creds.write_text(json.dumps({"otherSchema": {"foo": "bar"}})) monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", creds) with caplog.at_level("WARNING", logger="askcc.runners"), pytest.raises(OAuthTokenNotFoundError): - _resolve_oauth_token() + runner._resolve_oauth_token() assert "claudeAiOauth.accessToken" in caplog.text def test_empty_token_file_falls_through( self, + runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path, ): @@ -2322,14 +2334,16 @@ def test_empty_token_file_falls_through( empty_file.write_text(" \n \n") monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(empty_file)) with pytest.raises(OAuthTokenNotFoundError): - _resolve_oauth_token() + runner._resolve_oauth_token() - def test_all_sources_empty_raises_with_paths(self, monkeypatch: pytest.MonkeyPatch, tmp_path: Path): + def test_all_sources_empty_raises_with_paths( + self, runner: ClaudeRunner, monkeypatch: pytest.MonkeyPatch, tmp_path: Path + ): monkeypatch.setenv(OAUTH_TOKEN_FILE_ENV, str(tmp_path / "missing-custom")) monkeypatch.setattr("askcc.runners.CONVENTIONAL_TOKEN_FILE", tmp_path / "missing-conventional") monkeypatch.setattr("askcc.runners.CREDENTIALS_JSON_FILE", tmp_path / "missing-credentials.json") with pytest.raises(OAuthTokenNotFoundError) as exc_info: - _resolve_oauth_token() + runner._resolve_oauth_token() message = str(exc_info.value) assert OAUTH_TOKEN_ENV in message assert OAUTH_TOKEN_FILE_ENV in message @@ -2337,12 +2351,12 @@ def test_all_sources_empty_raises_with_paths(self, monkeypatch: pytest.MonkeyPat assert "missing-conventional" in message assert "missing-credentials.json" in message - def test_read_credentials_json_helper_returns_none_when_missing(self, tmp_path: Path): - result = _read_credentials_json(tmp_path / "does-not-exist") + def test_read_credentials_json_helper_returns_none_when_missing(self, runner: ClaudeRunner, tmp_path: Path): + result = runner._read_credentials_json(tmp_path / "does-not-exist") assert result is None - def test_read_token_file_helper_returns_none_when_missing(self, tmp_path: Path): - result = _read_token_file(tmp_path / "does-not-exist") + def test_read_token_file_helper_returns_none_when_missing(self, runner: ClaudeRunner, tmp_path: Path): + result = runner._read_token_file(tmp_path / "does-not-exist") assert result is None @@ -2370,7 +2384,7 @@ def test_runner_injects_resolved_token_into_subprocess_env(self, runner: ClaudeR mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="{}", stderr="") with ( patch( - "askcc.runners._resolve_oauth_token", + "askcc.runners.ClaudeRunner._resolve_oauth_token", return_value=("xyz", f"file /tmp/example-{OAUTH_TOKEN_ENV}"), ), patch("askcc.runners.subprocess.run", return_value=mock_result) as mock_run, @@ -2389,7 +2403,7 @@ def test_runner_logs_source_for_non_env_resolution( with ( caplog.at_level("INFO", logger="askcc.runners"), patch( - "askcc.runners._resolve_oauth_token", + "askcc.runners.ClaudeRunner._resolve_oauth_token", return_value=("token", "file /home/user/.tokens/.claude-oauth-token"), ), patch("askcc.runners.subprocess.run", return_value=mock_result), @@ -2408,7 +2422,7 @@ def test_runner_no_log_when_env_var_source( with ( caplog.at_level("INFO", logger="askcc.runners"), patch( - "askcc.runners._resolve_oauth_token", + "askcc.runners.ClaudeRunner._resolve_oauth_token", return_value=("token", f"env {OAUTH_TOKEN_ENV}"), ), patch("askcc.runners.subprocess.run", return_value=mock_result), @@ -2427,7 +2441,7 @@ def test_runner_logs_warning_when_credentials_json_used( with ( caplog.at_level("WARNING", logger="askcc.runners"), patch( - "askcc.runners._resolve_oauth_token", + "askcc.runners.ClaudeRunner._resolve_oauth_token", return_value=("token", f"file {creds_path}"), ), patch("askcc.runners.subprocess.run", return_value=mock_result),