diff --git a/examples/skillsbench-task-source-preflight-smoke.py b/examples/skillsbench-task-source-preflight-smoke.py new file mode 100644 index 00000000..55878b7b --- /dev/null +++ b/examples/skillsbench-task-source-preflight-smoke.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""Smoke-test SkillsBench canonical task-source preflight.""" + +from __future__ import annotations + +import contextlib +import io +import json +import sys +import tempfile +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from loopx.benchmark_ledger import load_benchmark_run_ledger # noqa: E402 +from scripts.skillsbench_automation_loop import ( # noqa: E402 + build_plan, + main as skillsbench_automation_loop_main, + parse_args, +) + + +def _write_task(root: Path, relative: str) -> None: + task = root / relative + dockerfile = task / "environment" / "Dockerfile" + dockerfile.parent.mkdir(parents=True, exist_ok=True) + dockerfile.write_text("FROM scratch\n", encoding="utf-8") + (task / "task.toml").write_text('version = "1.1"\n', encoding="utf-8") + + +def test_sanity_task_source_fails_before_runner_spend() -> None: + with tempfile.TemporaryDirectory(prefix="skillsbench-task-source-") as tmp: + root = Path(tmp) + skillsbench_root = root / "skillsbench" + _write_task(skillsbench_root, "experiments/sanity-tasks/hello-world") + _write_task(skillsbench_root, "tasks/citation-check") + _write_task(skillsbench_root, "tasks/powerlifting-coef-calc") + + jobs = root / "jobs" + ledger = root / "ledger.json" + args = [ + "--task-id", + "hello-world", + "--route", + "raw-codex-autonomous-max5", + "--skillsbench-root", + str(skillsbench_root), + "--jobs-dir", + str(jobs), + "--job-name", + "skillsbench-hello-world-task-source-preflight", + "--run-group-id", + "skillsbench-hello-world-task-source-preflight", + "--ledger-path", + str(ledger), + "--update-ledger", + ] + plan = build_plan(parse_args(args)) + preflight = plan["task_setup_preflight"] + assert preflight["status"] == "task_missing_from_canonical_tasks", preflight + assert preflight["canonical_task_present"] is False, preflight + assert preflight["alternate_source_kind"] == "experiments_sanity_tasks", ( + preflight + ) + assert preflight["task_source_path_recorded"] is False, preflight + assert preflight["task_source_content_recorded"] is False, preflight + assert preflight["nearest_canonical_task_ids"] == [ + "citation-check", + "powerlifting-coef-calc", + ], preflight + + stderr = io.StringIO() + with contextlib.redirect_stderr(stderr): + rc = skillsbench_automation_loop_main(args) + + assert rc == 0, stderr.getvalue() + compact_path = ( + jobs + / "skillsbench-hello-world-task-source-preflight" + / "hello-world__raw_codex_autonomous_max5" + / "benchmark_run.compact.json" + ) + compact = json.loads(compact_path.read_text(encoding="utf-8")) + assert compact["first_blocker"] == "skillsbench_task_source_preflight_blocked" + assert compact["score_failure_attribution"] == ( + "skillsbench_task_source_preflight_blocked" + ) + assert compact["task_setup_preflight"]["status"] == ( + "task_missing_from_canonical_tasks" + ) + assert compact["task_setup_preflight"]["alternate_source_kind"] == ( + "experiments_sanity_tasks" + ) + assert compact["validation"]["no_raw_task_text_read"] is True, compact + + update = load_benchmark_run_ledger(ledger) + case = update["benchmarks"]["skillsbench@1.1"]["cases"]["hello-world"] + assert case["latest_decision"]["decision"] == ( + "baseline_task_source_preflight_selection_required" + ), case + assert case["runs"][0]["repair_class"] == ( + "skillsbench_task_source_preflight_selection" + ) + + +if __name__ == "__main__": + test_sanity_task_source_fails_before_runner_spend() + print("skillsbench-task-source-preflight-smoke ok") diff --git a/loopx/benchmark_adapters/skillsbench.py b/loopx/benchmark_adapters/skillsbench.py index 8f17bbb4..3d4568cc 100644 --- a/loopx/benchmark_adapters/skillsbench.py +++ b/loopx/benchmark_adapters/skillsbench.py @@ -1352,6 +1352,16 @@ def skillsbench_runner_error_attribution(error_text: str) -> tuple[str, str, lis if "benchflow result.json not found" in text: label = "skillsbench_result_json_missing_after_runner_exit" return label, label, [label, "skillsbench_runner_setup_error"] + if ( + "skillsbench task source preflight blocked" in text + or "task missing from canonical tasks source" in text + ): + label = "skillsbench_task_source_preflight_blocked" + return label, label, [ + label, + "skillsbench_runner_setup_error", + "skillsbench_task_source_preflight", + ] if ( "could not find the file /app" in text or "main:/app/skills" in text diff --git a/loopx/benchmark_ledger.py b/loopx/benchmark_ledger.py index 79791352..fdee6b78 100644 --- a/loopx/benchmark_ledger.py +++ b/loopx/benchmark_ledger.py @@ -140,6 +140,48 @@ def _compact_task_staging(value: Any) -> dict[str, Any]: return compact +def _compact_task_setup_preflight(value: Any) -> dict[str, Any]: + if not isinstance(value, dict): + return {} + compact: dict[str, Any] = {} + for field in ( + "schema_version", + "status", + "sandbox", + "task_id", + "first_blocker", + "alternate_source_kind", + "selection_recommendation", + ): + text = _compact_text(value.get(field), limit=140) + if text: + compact[field] = text + for field in ( + "raw_task_text_read", + "raw_logs_read", + "raw_trajectory_read", + "apt_setup_risk_detected", + "apt_retry_patch_required", + "dockerfile_present", + "canonical_task_present", + "alternate_source_supported_by_runner", + "task_source_path_recorded", + "task_source_content_recorded", + ): + if isinstance(value.get(field), bool): + compact[field] = value[field] + nearest_ids = value.get("nearest_canonical_task_ids") + if isinstance(nearest_ids, list): + compact_nearest: list[str] = [] + for item in nearest_ids[:5]: + text = _compact_text(item, limit=120) + if text: + compact_nearest.append(text) + if compact_nearest: + compact["nearest_canonical_task_ids"] = compact_nearest + return compact + + def _compact_compose_setup_diagnostic(value: Any) -> dict[str, Any]: if not isinstance(value, dict): return {} @@ -907,6 +949,28 @@ def _repair_route( "raw_task_text_required": False, }, } + if failure_class == "skillsbench_task_source_preflight_blocked": + return { + "repair_priority": "P1", + "repair_class": "skillsbench_task_source_preflight_selection", + "next_action": ( + "select a SkillsBench task from the canonical tasks source, or " + "use an explicit sanity-source runner before spending a full " + "baseline/treatment arm" + ), + "repair_profile": { + "schema_version": "benchmark_repair_profile_v0", + "repair_class": "skillsbench_task_source_preflight_selection", + "rerun_allowed_after_profile_applied": True, + "required_preflight": [ + "skillsbench_task_setup_preflight", + "canonical_task_present", + "nearest_canonical_task_ids", + ], + "raw_logs_required": False, + "raw_task_text_required": False, + }, + } if failure_class == "score_missing": return { "repair_priority": "P0", @@ -1472,6 +1536,11 @@ def build_benchmark_run_ledger_entry( } ) entry.update(repair_route) + task_setup_preflight = _compact_task_setup_preflight( + benchmark_run.get("task_setup_preflight") + ) + if task_setup_preflight: + entry["task_setup_preflight"] = task_setup_preflight task_staging = _compact_task_staging(benchmark_run.get("task_staging")) if task_staging: entry["task_staging"] = task_staging @@ -1716,6 +1785,8 @@ def repair_decision(prefix: str, run: dict[str, Any]) -> dict[str, Any]: decision = f"{prefix}_codex_acp_post_success_finalization_required" elif repair_class == "skillsbench_setup_preflight_selection": decision = f"{prefix}_setup_preflight_selection_required" + elif repair_class == "skillsbench_task_source_preflight_selection": + decision = f"{prefix}_task_source_preflight_selection_required" elif repair_class == "worker_verifier_alignment": decision = f"{prefix}_worker_verifier_alignment_required" elif repair_class == "verifier_or_infra_repair": diff --git a/loopx/status.py b/loopx/status.py index 541228e4..143daca0 100644 --- a/loopx/status.py +++ b/loopx/status.py @@ -1709,6 +1709,46 @@ def _compact_benchmark_task_staging(value: Any) -> dict[str, Any]: return compact +def _compact_benchmark_task_setup_preflight(value: Any) -> dict[str, Any]: + if not isinstance(value, dict): + return {} + + compact: dict[str, Any] = {} + for field in ( + "schema_version", + "status", + "sandbox", + "task_id", + "first_blocker", + "alternate_source_kind", + "selection_recommendation", + ): + text = public_safe_compact_text(value.get(field), limit=180) + if text: + compact[field] = text + for field in ( + "raw_task_text_read", + "raw_logs_read", + "raw_trajectory_read", + "apt_setup_risk_detected", + "apt_retry_patch_required", + "dockerfile_present", + "canonical_task_present", + "alternate_source_supported_by_runner", + "task_source_path_recorded", + "task_source_content_recorded", + ): + if isinstance(value.get(field), bool): + compact[field] = value[field] + nearest_task_ids = public_safe_compact_list( + value.get("nearest_canonical_task_ids"), + limit=MAX_BENCHMARK_RUN_LIST_ITEMS, + ) + if nearest_task_ids: + compact["nearest_canonical_task_ids"] = nearest_task_ids + return compact + + def _compact_benchmark_compose_setup_diagnostic(value: Any) -> dict[str, Any]: if not isinstance(value, dict): return {} @@ -2918,6 +2958,11 @@ def compact_benchmark_run(run: dict[str, Any]) -> dict[str, Any] | None: ) if runner_prerequisites: compact["runner_prerequisites"] = runner_prerequisites + task_setup_preflight = _compact_benchmark_task_setup_preflight( + source.get("task_setup_preflight") + ) + if task_setup_preflight: + compact["task_setup_preflight"] = task_setup_preflight task_staging = _compact_benchmark_task_staging(source.get("task_staging")) if task_staging: compact["task_staging"] = task_staging diff --git a/scripts/skillsbench_automation_loop.py b/scripts/skillsbench_automation_loop.py index 8f68ff8f..057e265a 100755 --- a/scripts/skillsbench_automation_loop.py +++ b/scripts/skillsbench_automation_loop.py @@ -3902,7 +3902,15 @@ def _public_task_setup_preflight(value: Any) -> dict[str, Any]: return {} compact: dict[str, Any] = {} - for field in ("schema_version", "status", "sandbox", "selection_recommendation"): + for field in ( + "schema_version", + "status", + "sandbox", + "task_id", + "first_blocker", + "alternate_source_kind", + "selection_recommendation", + ): raw = value.get(field) if isinstance(raw, str) and raw: compact[field] = raw[:180] @@ -3913,9 +3921,21 @@ def _public_task_setup_preflight(value: Any) -> dict[str, Any]: "apt_setup_risk_detected", "apt_retry_patch_required", "dockerfile_present", + "canonical_task_present", + "alternate_source_supported_by_runner", + "task_source_path_recorded", + "task_source_content_recorded", ): if isinstance(value.get(field), bool): compact[field] = value[field] + for field in ("nearest_canonical_task_ids",): + raw_items = value.get(field) + if isinstance(raw_items, list): + compact[field] = [ + str(item)[:120] + for item in raw_items[:5] + if isinstance(item, str) and item + ] return compact @@ -4279,6 +4299,19 @@ def dockerfile_needs_apt_retry_patch(dockerfile: Path) -> bool: return bool(re.search(r"\bapt(?:-get)?\s+update\b", text, flags=re.IGNORECASE)) +def _skillsbench_public_task_label(value: Any, *, limit: int = 120) -> str: + text = str(value or "").strip() + cleaned = [] + for char in text: + cleaned.append( + char.lower() if char.isalnum() or char in {"-", "_", "."} else "-" + ) + label = "".join(cleaned).strip("-_.") + while "--" in label: + label = label.replace("--", "-") + return label[:limit] + + def skillsbench_task_setup_preflight( *, task_path: Path, @@ -4286,20 +4319,63 @@ def skillsbench_task_setup_preflight( ) -> dict[str, Any]: """Return public-safe setup-shape facts before spending a full run.""" + expanded_task_path = task_path.expanduser() + public_task_id = _skillsbench_public_task_label(expanded_task_path.name) preflight: dict[str, Any] = { "schema_version": "skillsbench_task_setup_preflight_v0", "sandbox": sandbox, + "task_id": public_task_id, "raw_task_text_read": False, "raw_logs_read": False, "raw_trajectory_read": False, + "task_source_path_recorded": False, + "task_source_content_recorded": False, + "canonical_task_present": False, + "alternate_source_supported_by_runner": False, "apt_setup_risk_detected": False, "apt_retry_patch_required": False, } + skillsbench_root = expanded_task_path.parent.parent + canonical_task_exists = expanded_task_path.is_dir() + preflight["canonical_task_present"] = canonical_task_exists + if not canonical_task_exists: + sanity_task = ( + skillsbench_root + / "experiments" + / "sanity-tasks" + / expanded_task_path.name + ) + alternate_source_kind = ( + "experiments_sanity_tasks" if sanity_task.is_dir() else "none" + ) + nearest: list[str] = [] + canonical_root = skillsbench_root / "tasks" + if canonical_root.is_dir(): + for child in sorted(canonical_root.iterdir(), key=lambda item: item.name): + if not child.is_dir(): + continue + label = _skillsbench_public_task_label(child.name) + if label: + nearest.append(label) + if len(nearest) >= 5: + break + preflight.update( + { + "status": "task_missing_from_canonical_tasks", + "first_blocker": "skillsbench_task_source_preflight_blocked", + "alternate_source_kind": alternate_source_kind, + "nearest_canonical_task_ids": nearest, + "selection_recommendation": ( + "choose_normal_tasks_candidate_or_use_explicit_sanity_source_runner" + ), + } + ) + return preflight if sandbox != "docker": preflight["status"] = "not_applicable" return preflight - dockerfile = task_path.expanduser() / "environment" / "Dockerfile" + dockerfile = expanded_task_path / "environment" / "Dockerfile" dockerfile_exists = dockerfile.exists() preflight["dockerfile_present"] = dockerfile_exists if not dockerfile_exists: @@ -8841,6 +8917,11 @@ def build_runner_failure_compact( if runner_prerequisites: compact["runner_prerequisites"] = runner_prerequisites _sync_relay_closeout_counts_into_compact(compact, runner_prerequisites) + task_setup_preflight = _public_task_setup_preflight( + plan.get("task_setup_preflight") + ) + if task_setup_preflight: + compact["task_setup_preflight"] = task_setup_preflight task_staging = _effective_public_task_staging(plan) if task_staging: compact["task_staging"] = task_staging @@ -9528,6 +9609,17 @@ async def async_main( "skillsbench apt setup risk preflight blocked: " "apt-based Docker setup risk detected before full case run" ) + if ( + not args.reduce_only + and setup_preflight.get("status") == "task_missing_from_canonical_tasks" + ): + staging = plan.setdefault("task_staging", {}) + if isinstance(staging, dict): + staging["task_source_preflight_blocked"] = True + raise SkillsBenchSetupPreflightBlocked( + "skillsbench task source preflight blocked: " + "task missing from canonical tasks source before full case run" + ) if ( _host_local_acp_codex_exec_preflight_should_run(args)