Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ Implemented:
- basic memory scaffolding
- basic control panel
- tests and validation scripts
- Claude review path after policy gates

Scaffolded:

Expand All @@ -473,7 +474,6 @@ Scaffolded:
- quality-gated local retry loop
- real pause/resume UI
- semantic memory indexing workflow
- Claude review path after policy gates

## Contributing

Expand Down
1 change: 1 addition & 0 deletions assistant_core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class AssistantConfig(BaseModel):
cloud_fallback_enabled: bool = False
cloud_fallback_policy: str = "explicit_only"
daily_cloud_budget_usd: float = 5.0
cloud_model_pricing: dict[str, Any] = Field(default_factory=dict)
archive_processed_files: bool = True
no_original_file_modification: bool = True
no_email_sending: bool = True
Expand Down
121 changes: 121 additions & 0 deletions assistant_core/execution/cloud_review.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations

from pathlib import Path
from typing import Any

from assistant_core.config import AssistantConfig
from assistant_core.execution.output_writer import dated_output_dir
from assistant_core.llm import litellm_client
from assistant_core.schemas import CloudCandidate, FileExecutionRecord

CLOUD_MODEL_GROUP = "cloud-claude-opus"
DEFAULT_CLOUD_INPUT_RATE_PER_1K = 0.015
DEFAULT_CLOUD_OUTPUT_RATE_PER_1K = 0.075


def estimate_cloud_cost_usd(
*,
prompt_tokens: int,
completion_tokens: int,
pricing: dict[str, Any] | None,
model_group: str = CLOUD_MODEL_GROUP,
) -> float:
"""Estimate cloud call cost from token counts and per-model pricing."""
model_pricing = (pricing or {}).get(model_group, {})
input_rate = float(
model_pricing.get("input_per_1k_usd", DEFAULT_CLOUD_INPUT_RATE_PER_1K)
)
output_rate = float(
model_pricing.get("output_per_1k_usd", DEFAULT_CLOUD_OUTPUT_RATE_PER_1K)
)
return (prompt_tokens / 1000.0 * input_rate) + (
completion_tokens / 1000.0 * output_rate
)


def run_cloud_review_call(
*,
cfg: AssistantConfig,
record: FileExecutionRecord,
candidate: CloudCandidate,
day,
spent_so_far_usd: float,
mode: str,
manual_override: bool,
) -> tuple[CloudCandidate, float]:
"""Call LiteLLM once for a gated cloud-review candidate and persist output."""
prompt = _build_cloud_review_prompt(record)
estimated_prompt_tokens = _estimate_tokens(prompt)
preflight_cost = estimate_cloud_cost_usd(
prompt_tokens=estimated_prompt_tokens,
completion_tokens=0,
pricing=cfg.cloud_model_pricing,
)
if spent_so_far_usd + preflight_cost > cfg.daily_cloud_budget_usd:
candidate.gate_passed = False
candidate.gate_block_reason = "budget exceeded"
candidate.escalated = False
candidate.estimated_cost_usd = None
return candidate, spent_so_far_usd

response = litellm_client.chat_completion(
[{"role": "user", "content": prompt}],
model_group=CLOUD_MODEL_GROUP,
base_url=cfg.litellm_base_url,
mode=mode,
manual_override=manual_override,
)
completion_tokens = _estimate_tokens(response)
actual_cost = estimate_cloud_cost_usd(
prompt_tokens=estimated_prompt_tokens,
completion_tokens=completion_tokens,
pricing=cfg.cloud_model_pricing,
)
if spent_so_far_usd + actual_cost > cfg.daily_cloud_budget_usd:
candidate.gate_passed = False
candidate.gate_block_reason = "budget exceeded"
candidate.escalated = False
candidate.estimated_cost_usd = None
return candidate, spent_so_far_usd

response_path = _write_cloud_response(cfg, day, record.file_path, response)
candidate.escalated = True
candidate.cloud_response_chars = len(response)
candidate.estimated_cost_usd = actual_cost
candidate.cloud_response_path = str(response_path)
record.escalated_to_cloud = True
record.cloud_response_chars = len(response)
return candidate, spent_so_far_usd + actual_cost


def _build_cloud_review_prompt(record: FileExecutionRecord) -> str:
return "\n".join(
[
"Review this file after local extraction failed quality gates.",
"Return concise findings, risks, and actionable next steps.",
"Do not modify files or perform external actions.",
"",
f"File: {record.file_path}",
f"Primary model group: {record.primary_model_group}",
f"Secondary model group: {record.secondary_model_group or '(none)'}",
f"Local error: {record.error or '(none)'}",
]
)


def _estimate_tokens(text: str) -> int:
return max(1, (len(text) + 3) // 4)


def _write_cloud_response(
cfg: AssistantConfig,
day,
file_path: str,
response: str,
) -> Path:
cloud_dir = dated_output_dir(cfg, day=day) / "_cloud"
cloud_dir.mkdir(parents=True, exist_ok=True)
safe_name = Path(file_path).name.replace("/", "_").replace(":", "_")
target = cloud_dir / f"{safe_name}.md"
target.write_text(response.strip() + "\n", encoding="utf-8")
return target
27 changes: 16 additions & 11 deletions assistant_core/execution/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from assistant_core.execution.context import StepContext
from assistant_core.execution.evaluators import deterministic_completeness_evaluator
from assistant_core.execution.cloud_review import run_cloud_review_call
from assistant_core.execution.file_utils import (
UnsupportedFileType,
chunk_text,
Expand Down Expand Up @@ -214,10 +215,8 @@ def cloud_review_gate(ctx: StepContext) -> StepContext:
1. Run it through ``assert_cloud_allowed`` with the current config.
2. Record a CloudCandidate on ``ctx.result.cloud_candidates`` capturing
the gate's verdict (allowed / blocked-with-reason).
3. When the gate allows it AND a cloud caller is wired, escalate.
Today no cloud caller is wired (deliberate default-off), so we
set ``escalated=False`` and 0 cost, and let the user wire the
actual call later by setting cloud_fallback_enabled + key + budget.
3. When the gate allows it, call LiteLLM once, save the response, and
stop before any call that would exceed the daily cloud budget.

The 08_CLOUD_REVIEW_CANDIDATES.md output is written from the candidates
list during ``write_outputs`` so the user can see what would have
Expand All @@ -230,6 +229,8 @@ def cloud_review_gate(ctx: StepContext) -> StepContext:
cfg = ctx.cfg
cloud_review_dir = cfg.inbox_dir / "cloud_review"

spent_so_far_usd = 0.0

for record in flagged:
candidate = CloudCandidate(
file_path=record.file_path,
Expand Down Expand Up @@ -258,14 +259,16 @@ def cloud_review_gate(ctx: StepContext) -> StepContext:
candidate.gate_passed = False
candidate.gate_block_reason = block_reason

# When the gate passes, escalation is policy-allowed. The actual HTTP
# call to Claude is left for a follow-up commit (no caller wired yet)
# so this default-off path never spends a token.
if candidate.gate_passed:
# Caller wiring lives here later. For now: mark we'd have called.
candidate.escalated = False
candidate.cloud_response_chars = 0
candidate.estimated_cost_usd = None
candidate, spent_so_far_usd = run_cloud_review_call(
cfg=cfg,
record=record,
candidate=candidate,
day=ctx.day,
spent_so_far_usd=spent_so_far_usd,
mode=ctx.mode,
manual_override=ctx.manual_override,
)

ctx.result.cloud_candidates.append(candidate)

Expand Down Expand Up @@ -436,6 +439,8 @@ def _write_cloud_candidates_file(ctx: StepContext, output_dir: Path) -> None:
lines.append(
f"- Estimated cost: ${candidate.estimated_cost_usd:.4f}"
)
if candidate.cloud_response_path:
lines.append(f"- Cloud response: {candidate.cloud_response_path}")
lines.append("")

target = output_dir / "08_CLOUD_REVIEW_CANDIDATES.md"
Expand Down
7 changes: 6 additions & 1 deletion assistant_core/llm/litellm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from assistant_core.llm.model_policy import assert_model_group_allowed

CLOUD_MODEL_GROUP = "cloud-claude-opus"


class LiteLLMClientError(RuntimeError):
"""Raised when LiteLLM proxy calls fail."""
Expand All @@ -18,7 +20,10 @@ def chat_completion(
manual_override: bool = False,
timeout: int = 180,
) -> str:
assert_model_group_allowed(model_group, mode, manual_override=manual_override)
# Cloud calls are authorized by assert_cloud_allowed before this function.
# Local groups still use the model-group safety gate here.
if model_group != CLOUD_MODEL_GROUP:
assert_model_group_allowed(model_group, mode, manual_override=manual_override)
response = requests.post(
f"{base_url.rstrip('/')}/v1/chat/completions",
json={"model": model_group, "messages": messages},
Expand Down
1 change: 1 addition & 0 deletions assistant_core/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class CloudCandidate(BaseModel):
escalated: bool = False
cloud_response_chars: int = 0
estimated_cost_usd: float | None = None
cloud_response_path: str | None = None


class ExecutionResult(BaseModel):
Expand Down
4 changes: 4 additions & 0 deletions config/assistant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ block_execution_if_output_unclear: true
cloud_fallback_enabled: false
cloud_fallback_policy: explicit_only
daily_cloud_budget_usd: 5.00
cloud_model_pricing:
cloud-claude-opus:
input_per_1k_usd: 0.015
output_per_1k_usd: 0.075

archive_processed_files: true
no_original_file_modification: true
Expand Down
4 changes: 2 additions & 2 deletions scripts/pdf_builders/system_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,8 +1156,8 @@ def build_system_guide(styles) -> list:
["Citation / grounding enforcement", "Implemented", "WorkPacket.grounding_required=True makes the evaluator require >=3 'Source:' lines per extraction. Auto-set when request mentions 'cite source', 'every claim', 'regulated', 'auditable'."],
["Audit export (one packet -> single JSON)", "Implemented", "scripts/export_audit.sh dumps the full Postgres audit trail for one packet, reviewer-ready."],
["Indie-dev one-shot code review", "Implemented", "scripts/review_code.sh: copy file -> packet with task_type=code_review -> synchronous execution -> review markdown in ~/LocalAI/output and ~/Obsidian vault."],
["Cloud-review gate (catalog mode)", "Implemented", "Files where both local attempts fail are catalogued in 08_CLOUD_REVIEW_CANDIDATES.md with the gate verdict. Default-off: no cloud spend until cloud_fallback_enabled + ANTHROPIC_API_KEY are set."],
["Claude cloud-review caller (actual HTTP)", "Scaffolded", "Gate runs and catalogs candidates; the actual HTTP call to Claude isn't wired yet."],
["Cloud-review gate", "Implemented", "Files where both local attempts fail are catalogued in 08_CLOUD_REVIEW_CANDIDATES.md with the gate verdict. Default-off: no cloud spend until cloud_fallback_enabled + ANTHROPIC_API_KEY are set."],
["Claude cloud-review caller (actual HTTP)", "Implemented", "When all six gates pass, the gate makes one LiteLLM call per flagged file, writes the response under _cloud, and records estimated cost."],
["Self-consistency / self-critique loops", "Scaffolded", "Extraction is single-shot today. N-of-3 voting and self-critique are the next quality lift."],
["Pause/resume UI", "Scaffolded", "Buttons disabled. DB statuses exist."],
["Semantic memory indexing", "Scaffolded", "Chroma client exists. No indexer yet."],
Expand Down
95 changes: 95 additions & 0 deletions tests/test_cloud_review.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

from datetime import date

from assistant_core.execution import cloud_review
from assistant_core.schemas import CloudCandidate, FileExecutionRecord


def test_estimate_cloud_cost_uses_configured_rates():
cost = cloud_review.estimate_cloud_cost_usd(
prompt_tokens=1000,
completion_tokens=2000,
pricing={
"cloud-claude-opus": {
"input_per_1k_usd": 0.01,
"output_per_1k_usd": 0.02,
}
},
)

assert cost == 0.05


def test_run_cloud_review_call_writes_response_and_costs(tmp_path, monkeypatch):
from tests.test_execution_steps import _cfg

cfg = _cfg(tmp_path)
cfg.cloud_fallback_enabled = True
cfg.cloud_model_pricing = {
"cloud-claude-opus": {
"input_per_1k_usd": 0.01,
"output_per_1k_usd": 0.02,
}
}
record = FileExecutionRecord(
file_path=str(tmp_path / "LocalAI/inbox/cloud_review/example.md"),
needs_cloud_review=True,
)
candidate = CloudCandidate(file_path=record.file_path, reason="test", gate_passed=True)

monkeypatch.setattr(
cloud_review.litellm_client,
"chat_completion",
lambda *args, **kwargs: "cloud review response",
)

updated, spent = cloud_review.run_cloud_review_call(
cfg=cfg,
record=record,
candidate=candidate,
day=date(2026, 5, 18),
spent_so_far_usd=0.0,
mode="NIGHT_MODE",
manual_override=False,
)

assert updated.escalated is True
assert updated.cloud_response_chars == len("cloud review response")
assert updated.estimated_cost_usd is not None
assert spent == updated.estimated_cost_usd
assert updated.cloud_response_path is not None
assert "cloud review response" in open(updated.cloud_response_path, encoding="utf-8").read()
assert record.escalated_to_cloud is True


def test_run_cloud_review_call_blocks_when_budget_would_be_exceeded(tmp_path, monkeypatch):
from tests.test_execution_steps import _cfg

cfg = _cfg(tmp_path)
cfg.daily_cloud_budget_usd = 0.000001
record = FileExecutionRecord(
file_path=str(tmp_path / "LocalAI/inbox/cloud_review/example.md"),
needs_cloud_review=True,
)
candidate = CloudCandidate(file_path=record.file_path, reason="test", gate_passed=True)

def fail_if_called(*args, **kwargs):
raise AssertionError("LiteLLM should not be called after budget preflight fails")

monkeypatch.setattr(cloud_review.litellm_client, "chat_completion", fail_if_called)

updated, spent = cloud_review.run_cloud_review_call(
cfg=cfg,
record=record,
candidate=candidate,
day=date(2026, 5, 18),
spent_so_far_usd=0.0,
mode="NIGHT_MODE",
manual_override=False,
)

assert updated.escalated is False
assert updated.gate_passed is False
assert updated.gate_block_reason == "budget exceeded"
assert spent == 0.0
37 changes: 37 additions & 0 deletions tests/test_execution_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,43 @@ def test_cloud_review_gate_blocks_when_file_outside_cloud_review_dir(
assert c.escalated is False


def test_cloud_review_gate_escalates_when_gates_pass(tmp_path, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test")
ctx = _make_ctx(tmp_path, source_paths=[tmp_path])
ctx.cfg.cloud_fallback_enabled = True
ctx.packet.cloud_policy = {"allowed": True, "explicit": True}
ctx.packet.high_stakes = True
path = tmp_path / "LocalAI/inbox/cloud_review/file.md"
path.parent.mkdir(parents=True)
path.write_text("needs review", encoding="utf-8")
ctx.result.file_records.append(
FileExecutionRecord(file_path=str(path), needs_cloud_review=True)
)

monkeypatch.setattr(
steps,
"run_cloud_review_call",
lambda **kwargs: (
kwargs["candidate"].model_copy(
update={
"escalated": True,
"cloud_response_chars": 3,
"estimated_cost_usd": 0.01,
}
),
0.01,
),
)

steps.cloud_review_gate(ctx)

c = ctx.result.cloud_candidates[0]
assert c.gate_passed is True
assert c.escalated is True
assert c.cloud_response_chars == 3
assert ctx.result.cloud_calls_made == 1


def test_cloud_candidates_file_is_written_even_when_empty(tmp_path):
ctx = _make_ctx(tmp_path, source_paths=[tmp_path])
ctx.brief_md = "# Morning Brief\n"
Expand Down