Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions .map/scripts/map_step_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11705,7 +11705,7 @@ def record_test_baseline(
test_command: str = "",
*,
module_dir: str = "",
timeout_seconds: int = 120,
timeout_seconds: int = 600,
) -> dict[str, object]:
"""Record a pre-flight test baseline so subtasks can distinguish
"this regression is mine" from "this was broken before I started".
Expand Down Expand Up @@ -11867,9 +11867,17 @@ def record_test_baseline(
if m:
failures.append(m.group(1))

if timed_out:
status = "timed_out"
elif returncode == 0:
status = "success"
else:
status = "baseline_failures"

payload: dict[str, object] = {
"branch": branch_name,
"status": "success" if returncode == 0 else "baseline_failures",
"status": status,
"baseline_complete": not timed_out,
"command": cmd_str,
"auto_detected": bool(auto_detected_command),
"module_dir": detected_module_dir,
Expand Down Expand Up @@ -11907,14 +11915,26 @@ def list_baseline_failures(branch: str) -> dict[str, object]:
failures = data.get("baseline_failures", [])
if not isinstance(failures, list):
failures = []
return {
baseline_complete = data.get("baseline_complete", not data.get("timed_out", False))
timed_out_flag = data.get("timed_out", False)
result: dict[str, object] = {
"status": "success",
"branch": branch_name,
"command": data.get("command", ""),
"returncode": data.get("returncode"),
"baseline_complete": baseline_complete,
"timed_out": timed_out_flag,
"baseline_failures": failures,
"recorded_at": data.get("recorded_at"),
}
if timed_out_flag:
result["warning"] = (
"Baseline timed out — baseline_failures is empty because the suite "
"did not finish, not because there were no pre-existing failures. "
"Treat this baseline as UNKNOWN, not clean. Re-run record_test_baseline "
"with a longer --timeout or a faster --command."
)
return result


def _acknowledged_diagnostics_path(branch: str) -> Path:
Expand Down Expand Up @@ -18578,7 +18598,7 @@ def _opt_value(flag: str) -> str:
baseline_branch = sys.argv[2]
baseline_cmd = ""
baseline_module_dir = ""
baseline_timeout = 120
baseline_timeout = 600
if "--command" in sys.argv:
c_idx = sys.argv.index("--command")
if c_idx + 1 < len(sys.argv):
Expand Down
28 changes: 24 additions & 4 deletions src/mapify_cli/templates/map/scripts/map_step_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11705,7 +11705,7 @@ def record_test_baseline(
test_command: str = "",
*,
module_dir: str = "",
timeout_seconds: int = 120,
timeout_seconds: int = 600,
) -> dict[str, object]:
"""Record a pre-flight test baseline so subtasks can distinguish
"this regression is mine" from "this was broken before I started".
Expand Down Expand Up @@ -11867,9 +11867,17 @@ def record_test_baseline(
if m:
failures.append(m.group(1))

if timed_out:
status = "timed_out"
elif returncode == 0:
status = "success"
else:
status = "baseline_failures"

payload: dict[str, object] = {
"branch": branch_name,
"status": "success" if returncode == 0 else "baseline_failures",
"status": status,
"baseline_complete": not timed_out,
"command": cmd_str,
"auto_detected": bool(auto_detected_command),
"module_dir": detected_module_dir,
Expand Down Expand Up @@ -11907,14 +11915,26 @@ def list_baseline_failures(branch: str) -> dict[str, object]:
failures = data.get("baseline_failures", [])
if not isinstance(failures, list):
failures = []
return {
baseline_complete = data.get("baseline_complete", not data.get("timed_out", False))
timed_out_flag = data.get("timed_out", False)
result: dict[str, object] = {
"status": "success",
"branch": branch_name,
"command": data.get("command", ""),
"returncode": data.get("returncode"),
"baseline_complete": baseline_complete,
"timed_out": timed_out_flag,
"baseline_failures": failures,
"recorded_at": data.get("recorded_at"),
}
if timed_out_flag:
result["warning"] = (
"Baseline timed out — baseline_failures is empty because the suite "
"did not finish, not because there were no pre-existing failures. "
"Treat this baseline as UNKNOWN, not clean. Re-run record_test_baseline "
"with a longer --timeout or a faster --command."
)
return result


def _acknowledged_diagnostics_path(branch: str) -> Path:
Expand Down Expand Up @@ -18578,7 +18598,7 @@ def _opt_value(flag: str) -> str:
baseline_branch = sys.argv[2]
baseline_cmd = ""
baseline_module_dir = ""
baseline_timeout = 120
baseline_timeout = 600
if "--command" in sys.argv:
c_idx = sys.argv.index("--command")
if c_idx + 1 < len(sys.argv):
Expand Down
28 changes: 24 additions & 4 deletions src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -11705,7 +11705,7 @@ def record_test_baseline(
test_command: str = "",
*,
module_dir: str = "",
timeout_seconds: int = 120,
timeout_seconds: int = 600,
) -> dict[str, object]:
"""Record a pre-flight test baseline so subtasks can distinguish
"this regression is mine" from "this was broken before I started".
Expand Down Expand Up @@ -11867,9 +11867,17 @@ def record_test_baseline(
if m:
failures.append(m.group(1))

if timed_out:
status = "timed_out"
elif returncode == 0:
status = "success"
else:
status = "baseline_failures"

payload: dict[str, object] = {
"branch": branch_name,
"status": "success" if returncode == 0 else "baseline_failures",
"status": status,
"baseline_complete": not timed_out,
"command": cmd_str,
"auto_detected": bool(auto_detected_command),
"module_dir": detected_module_dir,
Expand Down Expand Up @@ -11907,14 +11915,26 @@ def list_baseline_failures(branch: str) -> dict[str, object]:
failures = data.get("baseline_failures", [])
if not isinstance(failures, list):
failures = []
return {
baseline_complete = data.get("baseline_complete", not data.get("timed_out", False))
timed_out_flag = data.get("timed_out", False)
result: dict[str, object] = {
"status": "success",
"branch": branch_name,
"command": data.get("command", ""),
"returncode": data.get("returncode"),
"baseline_complete": baseline_complete,
"timed_out": timed_out_flag,
"baseline_failures": failures,
"recorded_at": data.get("recorded_at"),
}
if timed_out_flag:
result["warning"] = (
"Baseline timed out — baseline_failures is empty because the suite "
"did not finish, not because there were no pre-existing failures. "
"Treat this baseline as UNKNOWN, not clean. Re-run record_test_baseline "
"with a longer --timeout or a faster --command."
)
return result


def _acknowledged_diagnostics_path(branch: str) -> Path:
Expand Down Expand Up @@ -18578,7 +18598,7 @@ if __name__ == "__main__":
baseline_branch = sys.argv[2]
baseline_cmd = ""
baseline_module_dir = ""
baseline_timeout = 120
baseline_timeout = 600
if "--command" in sys.argv:
c_idx = sys.argv.index("--command")
if c_idx + 1 < len(sys.argv):
Expand Down
56 changes: 56 additions & 0 deletions tests/test_map_step_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5943,6 +5943,62 @@ def test_list_baseline_failures_no_baseline_path(
assert report["status"] == "no_baseline"
assert report["baseline_failures"] == []

def test_baseline_timeout_is_unknown_not_clean(
self, branch_workspace, monkeypatch
):
"""Regression #307: a timed-out baseline must NOT look like a clean run.

When the suite exceeds the timeout the subprocess never finishes, so
baseline_failures is always [] — indistinguishable from a genuinely
green suite unless the caller checks baseline_complete / timed_out.
This test verifies that both record_test_baseline and list_baseline_failures
surface the timeout as an explicit 'unknown' signal, not a clean pass.
"""
repo = branch_workspace.parents[1]
monkeypatch.setenv("CLAUDE_PROJECT_DIR", str(repo))

import subprocess as _subprocess

def fake_run_timeout(cmd, **kwargs):
raise _subprocess.TimeoutExpired(cmd, kwargs.get("timeout", 1))

monkeypatch.setattr(map_step_runner.subprocess, "run", fake_run_timeout)

report = map_step_runner.record_test_baseline(
"test-branch", "pytest", timeout_seconds=1
)
# Must NOT report as success or baseline_failures with empty list
assert report["status"] == "timed_out", (
f"Expected 'timed_out' status, got {report['status']!r}"
)
assert report["baseline_complete"] is False
assert report["timed_out"] is True
assert report["baseline_failures"] == []

# list_baseline_failures must propagate the incomplete-baseline signal
listed = map_step_runner.list_baseline_failures("test-branch")
assert listed["status"] == "success"
assert listed["baseline_complete"] is False
assert listed["timed_out"] is True
assert "warning" in listed, "list_baseline_failures must emit a warning on timed-out baseline"
assert listed["baseline_failures"] == []

def test_baseline_complete_true_on_normal_run(
self, branch_workspace, monkeypatch
):
"""baseline_complete is True when the suite runs to completion (#307)."""
repo = branch_workspace.parents[1]
monkeypatch.setenv("CLAUDE_PROJECT_DIR", str(repo))
report = map_step_runner.record_test_baseline("test-branch", "true")
assert report["status"] == "success"
assert report["baseline_complete"] is True
assert report["timed_out"] is False

listed = map_step_runner.list_baseline_failures("test-branch")
assert listed["baseline_complete"] is True
assert listed["timed_out"] is False
assert "warning" not in listed


class TestRecordSubtaskResultFilesSeparatorParsing:
"""Fix #2 (2026-05-26): CLI must accept --files with comma OR
Expand Down
Loading