diff --git a/kaievolve/explore.py b/kaievolve/explore.py index e800057..b7f9a38 100644 --- a/kaievolve/explore.py +++ b/kaievolve/explore.py @@ -42,6 +42,31 @@ def read_json(path: Path) -> Optional[Dict[str, Any]]: return None +def read_progress(run_dir: Path) -> List[Dict[str, Any]]: + """The live per-iteration progress feed (``progress.jsonl``), if the run wrote + one. Each row: ``{iter, score, best, program_id, model, ts}``. Updates every + iteration (unlike checkpoints), so monitors/viewer can stream the trajectory + in real time. Empty list if the run predates the feed or hasn't started.""" + return read_jsonl(run_dir / "progress.jsonl") + + +def progress_trajectory(rows: List[Dict[str, Any]]): + """Turn progress rows into ``(best_points, raw_points)`` for svg.trajectory: + best-so-far step line and each attempt's raw score, keyed by iteration.""" + best, raw = [], [] + for r in rows: + it = r.get("iter") + if it is None: + continue + b = r.get("best") + s = r.get("score") + if isinstance(b, (int, float)): + best.append((int(it), float(b))) + if isinstance(s, (int, float)): + raw.append((int(it), float(s))) + return best, raw + + def latest_checkpoint(run_dir: Path) -> Optional[Path]: ck = run_dir / "checkpoints" if not ck.is_dir(): @@ -275,7 +300,18 @@ def run_state(run_dir: Path) -> RunState: if pts: st.iteration = max(st.iteration, pts[-1][0]) - mtimes = [p.stat().st_mtime for p in (ulog, ckpt) if p and p.exists()] + # Live override: progress.jsonl updates every iteration (checkpoints lag), + # so prefer it for the current iteration / best / curve when present. + prog_path = run_dir / "progress.jsonl" + prog = read_jsonl(prog_path) + if prog: + st.iteration = max((r.get("iter", st.iteration) for r in prog), default=st.iteration) + bests = [r.get("best") for r in prog if isinstance(r.get("best"), (int, float))] + if bests: + st.best_score = max(bests) + st.curve = bests # each row's 'best' is already best-so-far + + mtimes = [p.stat().st_mtime for p in (ulog, ckpt, prog_path) if p and p.exists()] if mtimes: st.stale_s = time.time() - max(mtimes) return st diff --git a/kaievolve/process_parallel.py b/kaievolve/process_parallel.py index 636497a..81c87bc 100644 --- a/kaievolve/process_parallel.py +++ b/kaievolve/process_parallel.py @@ -1418,6 +1418,45 @@ async def run_evolution( f"{child_program.id}" ) + # Live progress feed: one JSON line per completed iteration so + # `kai monitor` and the web viewer can stream the trajectory in + # real time (checkpoints only land every checkpoint_interval). + # Wrapped so telemetry can never break the evolution loop. + try: + import json as _json + import os as _os + import time as _time + + _best = self.database.get_best_program() + _bscore = ( + _best.metrics.get("combined_score") if _best and _best.metrics else None + ) + _cscore = ( + child_program.metrics.get("combined_score") + if child_program.metrics + else None + ) + _model = None + _idx = result.generated_by_model_idx + if _idx is not None and 0 <= _idx < len(self.config.llm.models): + _model = self.config.llm.models[_idx].name + with open(_os.path.join(self.output_dir, "progress.jsonl"), "a") as _pf: + _pf.write( + _json.dumps( + { + "iter": completed_iteration, + "score": _cscore, + "best": _bscore, + "program_id": child_program.id, + "model": _model, + "ts": _time.time(), + } + ) + + "\n" + ) + except Exception: + pass + # Early stopping check if ( early_stopping_active diff --git a/kaievolve/viewer/server.py b/kaievolve/viewer/server.py index 79e1c47..6c8f231 100644 --- a/kaievolve/viewer/server.py +++ b/kaievolve/viewer/server.py @@ -14,6 +14,10 @@ * ``/setup/{label}/run/{idx}`` run dashboard: trajectory chart + a summary panel beside it, then a full-width clickable step table. Clicking a step opens a right-side **drawer** (its notes / diff / solution viz) without leaving the page. +* ``/setup/{label}/run/{idx}/live.json`` live state of an active run (current + iteration, best, cost, calls, freshly-rendered chart). The dashboard polls this + every 3s while the run is active and updates the chart/stats in place; the + trajectory streams per-iteration from the run's ``progress.jsonl`` feed. * ``/setup/{label}/run/{idx}/step/{n}/detail`` the drawer fragment for one step (notes + measurements + diff + a viz iframe); fetched by the drawer JS. * ``/setup/{label}/run/{idx}/step/{n}/viz`` one step's solution viz as a @@ -146,6 +150,11 @@ .detail { min-width:0; } .chip { display:inline-block; font-size:0.74rem; padding:1px 7px; border-radius:9px; background:#eef4fb; color:var(--accent); margin-left:6px; } + /* live-run indicator (pulsing while a run is active) */ + .livedot { font-size:0.7rem; font-weight:600; color:var(--pos); margin-left:10px; + vertical-align:middle; animation:livepulse 1.4s ease-in-out infinite; } + .livedot.done { color:var(--muted); animation:none; } + @keyframes livepulse { 0%,100%{opacity:1} 50%{opacity:0.3} } /* back button (top of every drilled-in page) */ .backbtn { display:inline-flex; align-items:center; gap:5px; font-size:0.82rem; color:var(--muted); margin-bottom:4px; } @@ -293,26 +302,46 @@ _DASHBOARD = """{% extends "base" %}{% block title %}{{ short }} run {{ seed }} - kai{% endblock %} {% block content %} -

{{ short }} · run {{ seed }}

+

{{ short }} · run {{ seed }} + ● live

{{ task }}{% if blurb %} — {{ blurb }}{% endif %}

- {{ chart | safe }} +
{{ chart | safe }}

Line = best score so far · hollow dots = a new best · faint dots = - each attempt{% if has_std %} · whiskers = ±1σ score noise{% endif %}.

+ each attempt{% if has_std %} · whiskers = ±1σ score noise{% endif %}{% if has_progress %} · + streaming live every iteration{% endif %}.

-
best score{{ best }}
-
steps{{ steps_n }}
-
cost{{ cost }}
-
AI calls{{ calls }}
+
best score{{ best }}
+
{% if is_active %}iteration{% else %}steps{% endif %}{{ steps_n }}
+
cost{{ cost }}
+
AI calls{{ calls }}
models: {% for m in models %}{{ m.name }} {{ m.count }}{% if not loop.last %} · {% endif %}{% endfor %}
{% if has_solution %}see the solution →{% endif %}
+
approaches: strategies tried → @@ -865,6 +894,15 @@ def run_page(label: str, idx: int): raw.append((s.iteration, s.score)) raw_std.append(s.metrics.get("combined_score_std") if s.metrics else None) has_std = any(isinstance(x, (int, float)) and x > 0 for x in raw_std) + # Live feed: if the run wrote a per-iteration progress.jsonl, draw the + # trajectory from it (finer than checkpoints). Falls back to checkpoint + # steps for older runs. + progress = explore.read_progress(rd) + has_progress = bool(progress) + if has_progress: + pbest, praw = explore.progress_trajectory(progress) + if pbest: + best, raw, raw_std = pbest, praw, [None] * len(praw) chart = ( svg.trajectory(best, raw, raw_std=raw_std) if best @@ -915,11 +953,53 @@ def run_page(label: str, idx: int): calls=rs.calls, chart=chart, has_std=has_std, + has_progress=has_progress, + is_active=rs.is_active, has_solution=solution.find_visualizer(rd, roots) is not None, models=models, step_rows=step_rows, ) + # ── live JSON for the dashboard poller: current chart + headline stats ────── + @app.get("/setup/{label}/run/{idx}/live.json") + def run_live(label: str, idx: int): + from fastapi.responses import JSONResponse + + arm = arm_or_404(label) + if idx < 0 or idx >= len(arm.run_dirs): + raise HTTPException(404, f"run {idx} out of range for {label}") + rd = arm.run_dirs[idx] + rs = explore.run_state(rd) + progress = explore.read_progress(rd) + if progress: + best, raw = explore.progress_trajectory(progress) + iteration = max((r.get("iter", 0) for r in progress), default=0) + best_score = max( + (r.get("best") for r in progress if isinstance(r.get("best"), (int, float))), + default=None, + ) + else: + steps = explore.steps_for_run(rd) + best, raw, b = [], [], float("-inf") + for s in steps: + if s.score is not None: + b = max(b, s.score) + best.append((s.iteration, b)) + raw.append((s.iteration, s.score)) + iteration = len(steps) + best_score = rs.best_score + chart = svg.trajectory(best, raw) if best else "" + return JSONResponse( + { + "active": rs.is_active, + "iteration": iteration, + "best": best_score, + "cost": _fmt_cost(rs.cost), + "calls": rs.calls, + "chart": chart, + } + ) + # ── drawer fragment for one step (notes + diff + optional viz iframe) ─────── @app.get("/setup/{label}/run/{idx}/step/{iteration}/detail", response_class=HTMLResponse) def step_detail(label: str, idx: int, iteration: int): diff --git a/tests/test_live.py b/tests/test_live.py new file mode 100644 index 0000000..60ed854 --- /dev/null +++ b/tests/test_live.py @@ -0,0 +1,116 @@ +"""Tests for live-run streaming: the per-iteration progress feed (progress.jsonl), +its readers in kaievolve.explore, and the viewer's live badge + live.json endpoint. +""" + +import json +import time +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from kaievolve import explore + + +def _write_progress(run_dir: Path, rows): + run_dir.mkdir(parents=True, exist_ok=True) + with open(run_dir / "progress.jsonl", "w") as f: + for r in rows: + f.write(json.dumps(r) + "\n") + + +_ROWS = [ + {"iter": 0, "score": 0.30, "best": 0.30, "program_id": "p0", "model": "m", "ts": 1.0}, + {"iter": 1, "score": 0.20, "best": 0.30, "program_id": "p1", "model": "m", "ts": 2.0}, + {"iter": 2, "score": 0.55, "best": 0.55, "program_id": "p2", "model": "m", "ts": 3.0}, + {"iter": 3, "score": 0.62, "best": 0.62, "program_id": "p3", "model": "m", "ts": 4.0}, +] + + +class TestProgressFeed(unittest.TestCase): + def test_read_progress_absent_is_empty(self): + with TemporaryDirectory() as d: + self.assertEqual(explore.read_progress(Path(d)), []) + + def test_progress_trajectory(self): + best, raw = explore.progress_trajectory(_ROWS) + # best is the best-so-far step line; raw is every attempt + self.assertEqual(best, [(0, 0.30), (1, 0.30), (2, 0.55), (3, 0.62)]) + self.assertEqual(len(raw), 4) + self.assertEqual(raw[1], (1, 0.20)) + + def test_run_state_live_override(self): + with TemporaryDirectory() as d: + rd = Path(d) / "setup" / "task" / "run_3" + _write_progress(rd, _ROWS) + # a recent usage_log so the run reads as active + (rd / "usage_log.jsonl").write_text( + json.dumps({"model": "m", "cost": 0.01, "ts": time.time()}) + "\n" + ) + st = explore.run_state(rd) + self.assertEqual(st.iteration, 3) # current iteration from the feed + self.assertAlmostEqual(st.best_score, 0.62) + self.assertEqual(st.curve, [0.30, 0.30, 0.55, 0.62]) # finer than checkpoints + self.assertTrue(st.is_active) # recent activity + + +def _has_viewer_deps() -> bool: + try: + import fastapi # noqa: F401 + import jinja2 # noqa: F401 + from fastapi.testclient import TestClient # noqa: F401 + + return True + except ImportError: + return False + + +@unittest.skipUnless(_has_viewer_deps(), "viewer extra not installed") +class TestViewerLive(unittest.TestCase): + def _client(self, active: bool): + from fastapi.testclient import TestClient + from kaievolve.viewer.server import create_app + + self._tmp = TemporaryDirectory() + root = Path(self._tmp.name) / "bench" + rd = root / "auto_full" / "task" / "run_9" + _write_progress(rd, _ROWS) + (rd / "usage_log.jsonl").write_text( + json.dumps({"model": "m", "cost": 0.02, "total_tokens": 9, "ts": time.time()}) + "\n" + ) + (root / "auto_full" / "results.jsonl").write_text(json.dumps({"best_score": 0.62}) + "\n") + if not active: + # idle = the run's files haven't been touched in a while. is_active is + # mtime-based (a freshly-appended progress feed reads as live), so age + # the files well past the 300s activity window. + import os + + old = time.time() - 3600 + for p in (rd / "progress.jsonl", rd / "usage_log.jsonl"): + os.utime(p, (old, old)) + return TestClient(create_app(root, None)) + + def test_dashboard_live_badge_and_endpoint(self): + c = self._client(active=True) + page = c.get("/setup/auto_full/run/0") + self.assertEqual(page.status_code, 200) + # live badge shown (not hidden) + the streaming note + self.assertIn('id="livedot"', page.text) + self.assertNotIn('id="livedot" hidden', page.text) + self.assertIn("streaming live", page.text) + lj = c.get("/setup/auto_full/run/0/live.json").json() + self.assertTrue(lj["active"]) + self.assertEqual(lj["iteration"], 3) + self.assertAlmostEqual(lj["best"], 0.62) + self.assertIn("