From c874af4dde621eca0b7b63365a137c2f5bba001d Mon Sep 17 00:00:00 2001 From: samsamtrum Date: Fri, 22 May 2026 16:21:02 +0700 Subject: [PATCH] Wire cloud review caller --- README.md | 2 +- assistant_core/config.py | 1 + assistant_core/execution/cloud_review.py | 121 +++++++++++++++++++++++ assistant_core/execution/steps.py | 27 ++--- assistant_core/llm/litellm_client.py | 7 +- assistant_core/schemas.py | 1 + config/assistant.yaml | 4 + scripts/pdf_builders/system_guide.py | 4 +- tests/test_cloud_review.py | 95 ++++++++++++++++++ tests/test_execution_steps.py | 37 +++++++ 10 files changed, 284 insertions(+), 15 deletions(-) create mode 100644 assistant_core/execution/cloud_review.py create mode 100644 tests/test_cloud_review.py diff --git a/README.md b/README.md index ab3318d..e251eec 100644 --- a/README.md +++ b/README.md @@ -465,6 +465,7 @@ Implemented: - basic memory scaffolding - basic control panel - tests and validation scripts +- Claude review path after policy gates Scaffolded: @@ -473,7 +474,6 @@ Scaffolded: - quality-gated local retry loop - real pause/resume UI - semantic memory indexing workflow -- Claude review path after policy gates ## Contributing diff --git a/assistant_core/config.py b/assistant_core/config.py index 5d418e4..df475fc 100644 --- a/assistant_core/config.py +++ b/assistant_core/config.py @@ -37,6 +37,7 @@ class AssistantConfig(BaseModel): cloud_fallback_enabled: bool = False cloud_fallback_policy: str = "explicit_only" daily_cloud_budget_usd: float = 5.0 + cloud_model_pricing: dict[str, Any] = Field(default_factory=dict) archive_processed_files: bool = True no_original_file_modification: bool = True no_email_sending: bool = True diff --git a/assistant_core/execution/cloud_review.py b/assistant_core/execution/cloud_review.py new file mode 100644 index 0000000..891d829 --- /dev/null +++ b/assistant_core/execution/cloud_review.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from assistant_core.config import AssistantConfig +from assistant_core.execution.output_writer import dated_output_dir +from assistant_core.llm import litellm_client +from assistant_core.schemas import CloudCandidate, FileExecutionRecord + +CLOUD_MODEL_GROUP = "cloud-claude-opus" +DEFAULT_CLOUD_INPUT_RATE_PER_1K = 0.015 +DEFAULT_CLOUD_OUTPUT_RATE_PER_1K = 0.075 + + +def estimate_cloud_cost_usd( + *, + prompt_tokens: int, + completion_tokens: int, + pricing: dict[str, Any] | None, + model_group: str = CLOUD_MODEL_GROUP, +) -> float: + """Estimate cloud call cost from token counts and per-model pricing.""" + model_pricing = (pricing or {}).get(model_group, {}) + input_rate = float( + model_pricing.get("input_per_1k_usd", DEFAULT_CLOUD_INPUT_RATE_PER_1K) + ) + output_rate = float( + model_pricing.get("output_per_1k_usd", DEFAULT_CLOUD_OUTPUT_RATE_PER_1K) + ) + return (prompt_tokens / 1000.0 * input_rate) + ( + completion_tokens / 1000.0 * output_rate + ) + + +def run_cloud_review_call( + *, + cfg: AssistantConfig, + record: FileExecutionRecord, + candidate: CloudCandidate, + day, + spent_so_far_usd: float, + mode: str, + manual_override: bool, +) -> tuple[CloudCandidate, float]: + """Call LiteLLM once for a gated cloud-review candidate and persist output.""" + prompt = _build_cloud_review_prompt(record) + estimated_prompt_tokens = _estimate_tokens(prompt) + preflight_cost = estimate_cloud_cost_usd( + prompt_tokens=estimated_prompt_tokens, + completion_tokens=0, + pricing=cfg.cloud_model_pricing, + ) + if spent_so_far_usd + preflight_cost > cfg.daily_cloud_budget_usd: + candidate.gate_passed = False + candidate.gate_block_reason = "budget exceeded" + candidate.escalated = False + candidate.estimated_cost_usd = None + return candidate, spent_so_far_usd + + response = litellm_client.chat_completion( + [{"role": "user", "content": prompt}], + model_group=CLOUD_MODEL_GROUP, + base_url=cfg.litellm_base_url, + mode=mode, + manual_override=manual_override, + ) + completion_tokens = _estimate_tokens(response) + actual_cost = estimate_cloud_cost_usd( + prompt_tokens=estimated_prompt_tokens, + completion_tokens=completion_tokens, + pricing=cfg.cloud_model_pricing, + ) + if spent_so_far_usd + actual_cost > cfg.daily_cloud_budget_usd: + candidate.gate_passed = False + candidate.gate_block_reason = "budget exceeded" + candidate.escalated = False + candidate.estimated_cost_usd = None + return candidate, spent_so_far_usd + + response_path = _write_cloud_response(cfg, day, record.file_path, response) + candidate.escalated = True + candidate.cloud_response_chars = len(response) + candidate.estimated_cost_usd = actual_cost + candidate.cloud_response_path = str(response_path) + record.escalated_to_cloud = True + record.cloud_response_chars = len(response) + return candidate, spent_so_far_usd + actual_cost + + +def _build_cloud_review_prompt(record: FileExecutionRecord) -> str: + return "\n".join( + [ + "Review this file after local extraction failed quality gates.", + "Return concise findings, risks, and actionable next steps.", + "Do not modify files or perform external actions.", + "", + f"File: {record.file_path}", + f"Primary model group: {record.primary_model_group}", + f"Secondary model group: {record.secondary_model_group or '(none)'}", + f"Local error: {record.error or '(none)'}", + ] + ) + + +def _estimate_tokens(text: str) -> int: + return max(1, (len(text) + 3) // 4) + + +def _write_cloud_response( + cfg: AssistantConfig, + day, + file_path: str, + response: str, +) -> Path: + cloud_dir = dated_output_dir(cfg, day=day) / "_cloud" + cloud_dir.mkdir(parents=True, exist_ok=True) + safe_name = Path(file_path).name.replace("/", "_").replace(":", "_") + target = cloud_dir / f"{safe_name}.md" + target.write_text(response.strip() + "\n", encoding="utf-8") + return target diff --git a/assistant_core/execution/steps.py b/assistant_core/execution/steps.py index 91c3476..192c2be 100644 --- a/assistant_core/execution/steps.py +++ b/assistant_core/execution/steps.py @@ -31,6 +31,7 @@ from assistant_core.execution.context import StepContext from assistant_core.execution.evaluators import deterministic_completeness_evaluator +from assistant_core.execution.cloud_review import run_cloud_review_call from assistant_core.execution.file_utils import ( UnsupportedFileType, chunk_text, @@ -214,10 +215,8 @@ def cloud_review_gate(ctx: StepContext) -> StepContext: 1. Run it through ``assert_cloud_allowed`` with the current config. 2. Record a CloudCandidate on ``ctx.result.cloud_candidates`` capturing the gate's verdict (allowed / blocked-with-reason). - 3. When the gate allows it AND a cloud caller is wired, escalate. - Today no cloud caller is wired (deliberate default-off), so we - set ``escalated=False`` and 0 cost, and let the user wire the - actual call later by setting cloud_fallback_enabled + key + budget. + 3. When the gate allows it, call LiteLLM once, save the response, and + stop before any call that would exceed the daily cloud budget. The 08_CLOUD_REVIEW_CANDIDATES.md output is written from the candidates list during ``write_outputs`` so the user can see what would have @@ -230,6 +229,8 @@ def cloud_review_gate(ctx: StepContext) -> StepContext: cfg = ctx.cfg cloud_review_dir = cfg.inbox_dir / "cloud_review" + spent_so_far_usd = 0.0 + for record in flagged: candidate = CloudCandidate( file_path=record.file_path, @@ -258,14 +259,16 @@ def cloud_review_gate(ctx: StepContext) -> StepContext: candidate.gate_passed = False candidate.gate_block_reason = block_reason - # When the gate passes, escalation is policy-allowed. The actual HTTP - # call to Claude is left for a follow-up commit (no caller wired yet) - # so this default-off path never spends a token. if candidate.gate_passed: - # Caller wiring lives here later. For now: mark we'd have called. - candidate.escalated = False - candidate.cloud_response_chars = 0 - candidate.estimated_cost_usd = None + candidate, spent_so_far_usd = run_cloud_review_call( + cfg=cfg, + record=record, + candidate=candidate, + day=ctx.day, + spent_so_far_usd=spent_so_far_usd, + mode=ctx.mode, + manual_override=ctx.manual_override, + ) ctx.result.cloud_candidates.append(candidate) @@ -436,6 +439,8 @@ def _write_cloud_candidates_file(ctx: StepContext, output_dir: Path) -> None: lines.append( f"- Estimated cost: ${candidate.estimated_cost_usd:.4f}" ) + if candidate.cloud_response_path: + lines.append(f"- Cloud response: {candidate.cloud_response_path}") lines.append("") target = output_dir / "08_CLOUD_REVIEW_CANDIDATES.md" diff --git a/assistant_core/llm/litellm_client.py b/assistant_core/llm/litellm_client.py index dd72d0a..d09b307 100644 --- a/assistant_core/llm/litellm_client.py +++ b/assistant_core/llm/litellm_client.py @@ -4,6 +4,8 @@ from assistant_core.llm.model_policy import assert_model_group_allowed +CLOUD_MODEL_GROUP = "cloud-claude-opus" + class LiteLLMClientError(RuntimeError): """Raised when LiteLLM proxy calls fail.""" @@ -18,7 +20,10 @@ def chat_completion( manual_override: bool = False, timeout: int = 180, ) -> str: - assert_model_group_allowed(model_group, mode, manual_override=manual_override) + # Cloud calls are authorized by assert_cloud_allowed before this function. + # Local groups still use the model-group safety gate here. + if model_group != CLOUD_MODEL_GROUP: + assert_model_group_allowed(model_group, mode, manual_override=manual_override) response = requests.post( f"{base_url.rstrip('/')}/v1/chat/completions", json={"model": model_group, "messages": messages}, diff --git a/assistant_core/schemas.py b/assistant_core/schemas.py index d6b554b..dbde6f2 100644 --- a/assistant_core/schemas.py +++ b/assistant_core/schemas.py @@ -205,6 +205,7 @@ class CloudCandidate(BaseModel): escalated: bool = False cloud_response_chars: int = 0 estimated_cost_usd: float | None = None + cloud_response_path: str | None = None class ExecutionResult(BaseModel): diff --git a/config/assistant.yaml b/config/assistant.yaml index 177081b..151d5be 100644 --- a/config/assistant.yaml +++ b/config/assistant.yaml @@ -29,6 +29,10 @@ block_execution_if_output_unclear: true cloud_fallback_enabled: false cloud_fallback_policy: explicit_only daily_cloud_budget_usd: 5.00 +cloud_model_pricing: + cloud-claude-opus: + input_per_1k_usd: 0.015 + output_per_1k_usd: 0.075 archive_processed_files: true no_original_file_modification: true diff --git a/scripts/pdf_builders/system_guide.py b/scripts/pdf_builders/system_guide.py index 4527dbe..e069c1d 100644 --- a/scripts/pdf_builders/system_guide.py +++ b/scripts/pdf_builders/system_guide.py @@ -1156,8 +1156,8 @@ def build_system_guide(styles) -> list: ["Citation / grounding enforcement", "Implemented", "WorkPacket.grounding_required=True makes the evaluator require >=3 'Source:' lines per extraction. Auto-set when request mentions 'cite source', 'every claim', 'regulated', 'auditable'."], ["Audit export (one packet -> single JSON)", "Implemented", "scripts/export_audit.sh dumps the full Postgres audit trail for one packet, reviewer-ready."], ["Indie-dev one-shot code review", "Implemented", "scripts/review_code.sh: copy file -> packet with task_type=code_review -> synchronous execution -> review markdown in ~/LocalAI/output and ~/Obsidian vault."], - ["Cloud-review gate (catalog mode)", "Implemented", "Files where both local attempts fail are catalogued in 08_CLOUD_REVIEW_CANDIDATES.md with the gate verdict. Default-off: no cloud spend until cloud_fallback_enabled + ANTHROPIC_API_KEY are set."], - ["Claude cloud-review caller (actual HTTP)", "Scaffolded", "Gate runs and catalogs candidates; the actual HTTP call to Claude isn't wired yet."], + ["Cloud-review gate", "Implemented", "Files where both local attempts fail are catalogued in 08_CLOUD_REVIEW_CANDIDATES.md with the gate verdict. Default-off: no cloud spend until cloud_fallback_enabled + ANTHROPIC_API_KEY are set."], + ["Claude cloud-review caller (actual HTTP)", "Implemented", "When all six gates pass, the gate makes one LiteLLM call per flagged file, writes the response under _cloud, and records estimated cost."], ["Self-consistency / self-critique loops", "Scaffolded", "Extraction is single-shot today. N-of-3 voting and self-critique are the next quality lift."], ["Pause/resume UI", "Scaffolded", "Buttons disabled. DB statuses exist."], ["Semantic memory indexing", "Scaffolded", "Chroma client exists. No indexer yet."], diff --git a/tests/test_cloud_review.py b/tests/test_cloud_review.py new file mode 100644 index 0000000..26611cc --- /dev/null +++ b/tests/test_cloud_review.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from datetime import date + +from assistant_core.execution import cloud_review +from assistant_core.schemas import CloudCandidate, FileExecutionRecord + + +def test_estimate_cloud_cost_uses_configured_rates(): + cost = cloud_review.estimate_cloud_cost_usd( + prompt_tokens=1000, + completion_tokens=2000, + pricing={ + "cloud-claude-opus": { + "input_per_1k_usd": 0.01, + "output_per_1k_usd": 0.02, + } + }, + ) + + assert cost == 0.05 + + +def test_run_cloud_review_call_writes_response_and_costs(tmp_path, monkeypatch): + from tests.test_execution_steps import _cfg + + cfg = _cfg(tmp_path) + cfg.cloud_fallback_enabled = True + cfg.cloud_model_pricing = { + "cloud-claude-opus": { + "input_per_1k_usd": 0.01, + "output_per_1k_usd": 0.02, + } + } + record = FileExecutionRecord( + file_path=str(tmp_path / "LocalAI/inbox/cloud_review/example.md"), + needs_cloud_review=True, + ) + candidate = CloudCandidate(file_path=record.file_path, reason="test", gate_passed=True) + + monkeypatch.setattr( + cloud_review.litellm_client, + "chat_completion", + lambda *args, **kwargs: "cloud review response", + ) + + updated, spent = cloud_review.run_cloud_review_call( + cfg=cfg, + record=record, + candidate=candidate, + day=date(2026, 5, 18), + spent_so_far_usd=0.0, + mode="NIGHT_MODE", + manual_override=False, + ) + + assert updated.escalated is True + assert updated.cloud_response_chars == len("cloud review response") + assert updated.estimated_cost_usd is not None + assert spent == updated.estimated_cost_usd + assert updated.cloud_response_path is not None + assert "cloud review response" in open(updated.cloud_response_path, encoding="utf-8").read() + assert record.escalated_to_cloud is True + + +def test_run_cloud_review_call_blocks_when_budget_would_be_exceeded(tmp_path, monkeypatch): + from tests.test_execution_steps import _cfg + + cfg = _cfg(tmp_path) + cfg.daily_cloud_budget_usd = 0.000001 + record = FileExecutionRecord( + file_path=str(tmp_path / "LocalAI/inbox/cloud_review/example.md"), + needs_cloud_review=True, + ) + candidate = CloudCandidate(file_path=record.file_path, reason="test", gate_passed=True) + + def fail_if_called(*args, **kwargs): + raise AssertionError("LiteLLM should not be called after budget preflight fails") + + monkeypatch.setattr(cloud_review.litellm_client, "chat_completion", fail_if_called) + + updated, spent = cloud_review.run_cloud_review_call( + cfg=cfg, + record=record, + candidate=candidate, + day=date(2026, 5, 18), + spent_so_far_usd=0.0, + mode="NIGHT_MODE", + manual_override=False, + ) + + assert updated.escalated is False + assert updated.gate_passed is False + assert updated.gate_block_reason == "budget exceeded" + assert spent == 0.0 diff --git a/tests/test_execution_steps.py b/tests/test_execution_steps.py index 3b3f2ae..5f32db5 100644 --- a/tests/test_execution_steps.py +++ b/tests/test_execution_steps.py @@ -422,6 +422,43 @@ def test_cloud_review_gate_blocks_when_file_outside_cloud_review_dir( assert c.escalated is False +def test_cloud_review_gate_escalates_when_gates_pass(tmp_path, monkeypatch): + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test") + ctx = _make_ctx(tmp_path, source_paths=[tmp_path]) + ctx.cfg.cloud_fallback_enabled = True + ctx.packet.cloud_policy = {"allowed": True, "explicit": True} + ctx.packet.high_stakes = True + path = tmp_path / "LocalAI/inbox/cloud_review/file.md" + path.parent.mkdir(parents=True) + path.write_text("needs review", encoding="utf-8") + ctx.result.file_records.append( + FileExecutionRecord(file_path=str(path), needs_cloud_review=True) + ) + + monkeypatch.setattr( + steps, + "run_cloud_review_call", + lambda **kwargs: ( + kwargs["candidate"].model_copy( + update={ + "escalated": True, + "cloud_response_chars": 3, + "estimated_cost_usd": 0.01, + } + ), + 0.01, + ), + ) + + steps.cloud_review_gate(ctx) + + c = ctx.result.cloud_candidates[0] + assert c.gate_passed is True + assert c.escalated is True + assert c.cloud_response_chars == 3 + assert ctx.result.cloud_calls_made == 1 + + def test_cloud_candidates_file_is_written_even_when_empty(tmp_path): ctx = _make_ctx(tmp_path, source_paths=[tmp_path]) ctx.brief_md = "# Morning Brief\n"