Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion kaievolve/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ def read_json(path: Path) -> Optional[Dict[str, Any]]:
return None


def read_progress(run_dir: Path) -> List[Dict[str, Any]]:
"""The live per-iteration progress feed (``progress.jsonl``), if the run wrote
one. Each row: ``{iter, score, best, program_id, model, ts}``. Updates every
iteration (unlike checkpoints), so monitors/viewer can stream the trajectory
in real time. Empty list if the run predates the feed or hasn't started."""
return read_jsonl(run_dir / "progress.jsonl")


def progress_trajectory(rows: List[Dict[str, Any]]):
"""Turn progress rows into ``(best_points, raw_points)`` for svg.trajectory:
best-so-far step line and each attempt's raw score, keyed by iteration."""
best, raw = [], []
for r in rows:
it = r.get("iter")
if it is None:
continue
b = r.get("best")
s = r.get("score")
if isinstance(b, (int, float)):
best.append((int(it), float(b)))
if isinstance(s, (int, float)):
raw.append((int(it), float(s)))
return best, raw


def latest_checkpoint(run_dir: Path) -> Optional[Path]:
ck = run_dir / "checkpoints"
if not ck.is_dir():
Expand Down Expand Up @@ -275,7 +300,18 @@ def run_state(run_dir: Path) -> RunState:
if pts:
st.iteration = max(st.iteration, pts[-1][0])

mtimes = [p.stat().st_mtime for p in (ulog, ckpt) if p and p.exists()]
# Live override: progress.jsonl updates every iteration (checkpoints lag),
# so prefer it for the current iteration / best / curve when present.
prog_path = run_dir / "progress.jsonl"
prog = read_jsonl(prog_path)
if prog:
st.iteration = max((r.get("iter", st.iteration) for r in prog), default=st.iteration)
bests = [r.get("best") for r in prog if isinstance(r.get("best"), (int, float))]
if bests:
st.best_score = max(bests)
st.curve = bests # each row's 'best' is already best-so-far

mtimes = [p.stat().st_mtime for p in (ulog, ckpt, prog_path) if p and p.exists()]
if mtimes:
st.stale_s = time.time() - max(mtimes)
return st
Expand Down
39 changes: 39 additions & 0 deletions kaievolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,45 @@ async def run_evolution(
f"{child_program.id}"
)

# Live progress feed: one JSON line per completed iteration so
# `kai monitor` and the web viewer can stream the trajectory in
# real time (checkpoints only land every checkpoint_interval).
# Wrapped so telemetry can never break the evolution loop.
try:
import json as _json
import os as _os
import time as _time

_best = self.database.get_best_program()
_bscore = (
_best.metrics.get("combined_score") if _best and _best.metrics else None
)
_cscore = (
child_program.metrics.get("combined_score")
if child_program.metrics
else None
)
_model = None
_idx = result.generated_by_model_idx
if _idx is not None and 0 <= _idx < len(self.config.llm.models):
_model = self.config.llm.models[_idx].name
with open(_os.path.join(self.output_dir, "progress.jsonl"), "a") as _pf:
_pf.write(
_json.dumps(
{
"iter": completed_iteration,
"score": _cscore,
"best": _bscore,
"program_id": child_program.id,
"model": _model,
"ts": _time.time(),
}
)
+ "\n"
)
except Exception:
pass

# Early stopping check
if (
early_stopping_active
Expand Down
94 changes: 87 additions & 7 deletions kaievolve/viewer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* ``/setup/{label}/run/{idx}`` run dashboard: trajectory chart + a summary
panel beside it, then a full-width clickable step table. Clicking a step opens
a right-side **drawer** (its notes / diff / solution viz) without leaving the page.
* ``/setup/{label}/run/{idx}/live.json`` live state of an active run (current
iteration, best, cost, calls, freshly-rendered chart). The dashboard polls this
every 3s while the run is active and updates the chart/stats in place; the
trajectory streams per-iteration from the run's ``progress.jsonl`` feed.
* ``/setup/{label}/run/{idx}/step/{n}/detail`` the drawer fragment for one step
(notes + measurements + diff + a viz iframe); fetched by the drawer JS.
* ``/setup/{label}/run/{idx}/step/{n}/viz`` one step's solution viz as a
Expand Down Expand Up @@ -146,6 +150,11 @@
.detail { min-width:0; }
.chip { display:inline-block; font-size:0.74rem; padding:1px 7px; border-radius:9px;
background:#eef4fb; color:var(--accent); margin-left:6px; }
/* live-run indicator (pulsing while a run is active) */
.livedot { font-size:0.7rem; font-weight:600; color:var(--pos); margin-left:10px;
vertical-align:middle; animation:livepulse 1.4s ease-in-out infinite; }
.livedot.done { color:var(--muted); animation:none; }
@keyframes livepulse { 0%,100%{opacity:1} 50%{opacity:0.3} }
/* back button (top of every drilled-in page) */
.backbtn { display:inline-flex; align-items:center; gap:5px; font-size:0.82rem;
color:var(--muted); margin-bottom:4px; }
Expand Down Expand Up @@ -293,26 +302,46 @@

_DASHBOARD = """{% extends "base" %}{% block title %}{{ short }} run {{ seed }} - kai{% endblock %}
{% block content %}
<h1>{{ short }} · run {{ seed }}</h1>
<h1>{{ short }} · run {{ seed }}
<span class="livedot" id="livedot" {% if not is_active %}hidden{% endif %}>&#9679; live</span></h1>
<p class="sub">{{ task }}{% if blurb %} — {{ blurb }}{% endif %}</p>

<div class="runhead">
<div>
{{ chart | safe }}
<div id="live-chart">{{ chart | safe }}</div>
<p class="note">Line = best score so far · hollow dots = a new best · faint dots =
each attempt{% if has_std %} · whiskers = ±1σ score noise{% endif %}.</p>
each attempt{% if has_std %} · whiskers = ±1σ score noise{% endif %}{% if has_progress %} ·
streaming live every iteration{% endif %}.</p>
</div>
<div class="summary">
<div class="row"><span class="k">best score</span><span class="v">{{ best }}</span></div>
<div class="row"><span class="k">steps</span><span class="v">{{ steps_n }}</span></div>
<div class="row"><span class="k">cost</span><span class="v">{{ cost }}</span></div>
<div class="row"><span class="k">AI calls</span><span class="v">{{ calls }}</span></div>
<div class="row"><span class="k">best score</span><span class="v" id="s-best">{{ best }}</span></div>
<div class="row"><span class="k">{% if is_active %}iteration{% else %}steps{% endif %}</span><span class="v" id="s-steps">{{ steps_n }}</span></div>
<div class="row"><span class="k">cost</span><span class="v" id="s-cost">{{ cost }}</span></div>
<div class="row"><span class="k">AI calls</span><span class="v" id="s-calls">{{ calls }}</span></div>
<div class="models">models: {% for m in models %}<b>{{ m.name }}</b> {{ m.count }}{% if not loop.last %} · {% endif %}{% endfor %}</div>
<div class="btn-row" style="margin-top:10px">
{% if has_solution %}<a class="btn sm" href="/setup/{{ label }}/run/{{ idx }}/solution">see the solution &rarr;</a>{% endif %}
</div>
</div>
</div>
<script>
(function(){
if (!{{ 'true' if is_active else 'false' }}) return;
var url = "/setup/{{ label }}/run/{{ idx }}/live.json";
function tick(){
fetch(url).then(function(r){return r.json();}).then(function(d){
if (d.chart) document.getElementById('live-chart').innerHTML = d.chart;
if (d.best !== null && d.best !== undefined) document.getElementById('s-best').textContent = (+d.best).toFixed(4);
if (d.iteration !== null && d.iteration !== undefined) document.getElementById('s-steps').textContent = d.iteration;
if (d.cost) document.getElementById('s-cost').textContent = d.cost;
if (d.calls !== null && d.calls !== undefined) document.getElementById('s-calls').textContent = d.calls;
if (!d.active){ var dot=document.getElementById('livedot'); if(dot){ dot.textContent='● finished'; dot.classList.add('done'); } return; }
setTimeout(tick, 3000);
}).catch(function(){ setTimeout(tick, 5000); });
}
setTimeout(tick, 3000);
})();
</script>

<div class="btn-row">
<a class="btn ghost sm" href="/setup/{{ label }}/run/{{ idx }}/approaches">approaches: strategies tried &rarr;</a>
Expand Down Expand Up @@ -865,6 +894,15 @@ def run_page(label: str, idx: int):
raw.append((s.iteration, s.score))
raw_std.append(s.metrics.get("combined_score_std") if s.metrics else None)
has_std = any(isinstance(x, (int, float)) and x > 0 for x in raw_std)
# Live feed: if the run wrote a per-iteration progress.jsonl, draw the
# trajectory from it (finer than checkpoints). Falls back to checkpoint
# steps for older runs.
progress = explore.read_progress(rd)
has_progress = bool(progress)
if has_progress:
pbest, praw = explore.progress_trajectory(progress)
if pbest:
best, raw, raw_std = pbest, praw, [None] * len(praw)
chart = (
svg.trajectory(best, raw, raw_std=raw_std)
if best
Expand Down Expand Up @@ -915,11 +953,53 @@ def run_page(label: str, idx: int):
calls=rs.calls,
chart=chart,
has_std=has_std,
has_progress=has_progress,
is_active=rs.is_active,
has_solution=solution.find_visualizer(rd, roots) is not None,
models=models,
step_rows=step_rows,
)

# ── live JSON for the dashboard poller: current chart + headline stats ──────
@app.get("/setup/{label}/run/{idx}/live.json")
def run_live(label: str, idx: int):
from fastapi.responses import JSONResponse

arm = arm_or_404(label)
if idx < 0 or idx >= len(arm.run_dirs):
raise HTTPException(404, f"run {idx} out of range for {label}")
rd = arm.run_dirs[idx]
rs = explore.run_state(rd)
progress = explore.read_progress(rd)
if progress:
best, raw = explore.progress_trajectory(progress)
iteration = max((r.get("iter", 0) for r in progress), default=0)
best_score = max(
(r.get("best") for r in progress if isinstance(r.get("best"), (int, float))),
default=None,
)
else:
steps = explore.steps_for_run(rd)
best, raw, b = [], [], float("-inf")
for s in steps:
if s.score is not None:
b = max(b, s.score)
best.append((s.iteration, b))
raw.append((s.iteration, s.score))
iteration = len(steps)
best_score = rs.best_score
chart = svg.trajectory(best, raw) if best else ""
return JSONResponse(
{
"active": rs.is_active,
"iteration": iteration,
"best": best_score,
"cost": _fmt_cost(rs.cost),
"calls": rs.calls,
"chart": chart,
}
)

# ── drawer fragment for one step (notes + diff + optional viz iframe) ───────
@app.get("/setup/{label}/run/{idx}/step/{iteration}/detail", response_class=HTMLResponse)
def step_detail(label: str, idx: int, iteration: int):
Expand Down
116 changes: 116 additions & 0 deletions tests/test_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for live-run streaming: the per-iteration progress feed (progress.jsonl),
its readers in kaievolve.explore, and the viewer's live badge + live.json endpoint.
"""

import json
import time
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory

from kaievolve import explore


def _write_progress(run_dir: Path, rows):
run_dir.mkdir(parents=True, exist_ok=True)
with open(run_dir / "progress.jsonl", "w") as f:
for r in rows:
f.write(json.dumps(r) + "\n")


_ROWS = [
{"iter": 0, "score": 0.30, "best": 0.30, "program_id": "p0", "model": "m", "ts": 1.0},
{"iter": 1, "score": 0.20, "best": 0.30, "program_id": "p1", "model": "m", "ts": 2.0},
{"iter": 2, "score": 0.55, "best": 0.55, "program_id": "p2", "model": "m", "ts": 3.0},
{"iter": 3, "score": 0.62, "best": 0.62, "program_id": "p3", "model": "m", "ts": 4.0},
]


class TestProgressFeed(unittest.TestCase):
def test_read_progress_absent_is_empty(self):
with TemporaryDirectory() as d:
self.assertEqual(explore.read_progress(Path(d)), [])

def test_progress_trajectory(self):
best, raw = explore.progress_trajectory(_ROWS)
# best is the best-so-far step line; raw is every attempt
self.assertEqual(best, [(0, 0.30), (1, 0.30), (2, 0.55), (3, 0.62)])
self.assertEqual(len(raw), 4)
self.assertEqual(raw[1], (1, 0.20))

def test_run_state_live_override(self):
with TemporaryDirectory() as d:
rd = Path(d) / "setup" / "task" / "run_3"
_write_progress(rd, _ROWS)
# a recent usage_log so the run reads as active
(rd / "usage_log.jsonl").write_text(
json.dumps({"model": "m", "cost": 0.01, "ts": time.time()}) + "\n"
)
st = explore.run_state(rd)
self.assertEqual(st.iteration, 3) # current iteration from the feed
self.assertAlmostEqual(st.best_score, 0.62)
self.assertEqual(st.curve, [0.30, 0.30, 0.55, 0.62]) # finer than checkpoints
self.assertTrue(st.is_active) # recent activity


def _has_viewer_deps() -> bool:
try:
import fastapi # noqa: F401
import jinja2 # noqa: F401
from fastapi.testclient import TestClient # noqa: F401

return True
except ImportError:
return False


@unittest.skipUnless(_has_viewer_deps(), "viewer extra not installed")
class TestViewerLive(unittest.TestCase):
def _client(self, active: bool):
from fastapi.testclient import TestClient
from kaievolve.viewer.server import create_app

self._tmp = TemporaryDirectory()
root = Path(self._tmp.name) / "bench"
rd = root / "auto_full" / "task" / "run_9"
_write_progress(rd, _ROWS)
(rd / "usage_log.jsonl").write_text(
json.dumps({"model": "m", "cost": 0.02, "total_tokens": 9, "ts": time.time()}) + "\n"
)
(root / "auto_full" / "results.jsonl").write_text(json.dumps({"best_score": 0.62}) + "\n")
if not active:
# idle = the run's files haven't been touched in a while. is_active is
# mtime-based (a freshly-appended progress feed reads as live), so age
# the files well past the 300s activity window.
import os

old = time.time() - 3600
for p in (rd / "progress.jsonl", rd / "usage_log.jsonl"):
os.utime(p, (old, old))
return TestClient(create_app(root, None))

def test_dashboard_live_badge_and_endpoint(self):
c = self._client(active=True)
page = c.get("/setup/auto_full/run/0")
self.assertEqual(page.status_code, 200)
# live badge shown (not hidden) + the streaming note
self.assertIn('id="livedot"', page.text)
self.assertNotIn('id="livedot" hidden', page.text)
self.assertIn("streaming live", page.text)
lj = c.get("/setup/auto_full/run/0/live.json").json()
self.assertTrue(lj["active"])
self.assertEqual(lj["iteration"], 3)
self.assertAlmostEqual(lj["best"], 0.62)
self.assertIn("<svg", lj["chart"])
self._tmp.cleanup()

def test_dashboard_badge_hidden_when_idle(self):
c = self._client(active=False)
page = c.get("/setup/auto_full/run/0")
self.assertIn('id="livedot" hidden', page.text) # badge present but hidden
self.assertFalse(c.get("/setup/auto_full/run/0/live.json").json()["active"])
self._tmp.cleanup()


if __name__ == "__main__":
unittest.main()
Loading