Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions frontend/app/runs/[id]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use client";

import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { useMutation, useQueryClient } from "@tanstack/react-query";
import Link from "next/link";
import { use } from "react";

Expand All @@ -10,8 +10,9 @@ import { EventStream } from "@/components/EventStream";
import { StatusBadge } from "@/components/StatusBadge";
import { TokenCostSummary } from "@/components/TokenCostSummary";
import { checkpointsByStep } from "@/lib/checkpoints";
import { formatCostUsd } from "@/lib/usage";
import { api } from "@/lib/api";
import { useRunWithLiveUpdates } from "@/lib/useRunLiveSync";
import { formatCostUsd } from "@/lib/usage";

interface PageProps {
params: Promise<{ id: string }>;
Expand All @@ -20,15 +21,7 @@ interface PageProps {
export default function RunDetailPage({ params }: PageProps) {
const { id } = use(params);
const queryClient = useQueryClient();
const run = useQuery({
queryKey: ["run", id],
queryFn: () => api.getRun(id),
refetchInterval: (q) => {
const status = q.state.data?.status;
if (status === "running" || status === "pending") return 2_000;
return false;
},
});
const run = useRunWithLiveUpdates(id);

const cancel = useMutation({
mutationFn: () => api.cancelRun(id),
Expand All @@ -40,7 +33,6 @@ export default function RunDetailPage({ params }: PageProps) {
return <p className="text-bad">Failed to load run: {String(run.error)}</p>;

const r = run.data;
const isLive = r.status === "running" || r.status === "pending";
const stepCheckpoints = checkpointsByStep(r.steps, r.checkpoints);

return (
Expand All @@ -60,7 +52,7 @@ export default function RunDetailPage({ params }: PageProps) {
</div>
</div>
<div className="flex gap-2">
{isLive && (
{run.isLive && (
<button
className="rounded border border-bad/40 text-bad px-3 py-1.5 text-sm hover:bg-bad/10"
onClick={() => cancel.mutate()}
Expand Down Expand Up @@ -185,10 +177,7 @@ export default function RunDetailPage({ params }: PageProps) {
</ol>
</section>

<EventStream
runId={id}
onTerminal={() => queryClient.invalidateQueries({ queryKey: ["run", id] })}
/>
<EventStream events={run.events} status={run.streamStatus} />
</div>
);
}
17 changes: 15 additions & 2 deletions frontend/app/runs/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,37 @@

import { useQuery } from "@tanstack/react-query";
import Link from "next/link";
import { useMemo } from "react";

import { StatusBadge } from "@/components/StatusBadge";
import { api } from "@/lib/api";
import {
liveRunIdsFromRuns,
useRunsListLiveSync,
} from "@/lib/useRunLiveSync";
import { formatCostUsd, hasUsageMetrics } from "@/lib/usage";

export default function RunsPage() {
const runs = useQuery({
queryKey: ["runs"],
queryFn: api.listRuns,
refetchInterval: 3_000,
});

const liveRunIds = useMemo(
() => liveRunIdsFromRuns(runs.data),
[runs.data],
);

useRunsListLiveSync(liveRunIds);

return (
<div className="max-w-5xl mx-auto space-y-4">
<header className="flex items-center justify-between">
<h1 className="text-xl font-semibold">Runs</h1>
<span className="text-xs text-muted">
auto-refreshing every 3s
{liveRunIds.length > 0
? `live via SSE (${liveRunIds.length} active)`
: "updates on refresh"}
</span>
</header>

Expand Down
203 changes: 36 additions & 167 deletions frontend/components/EventStream.tsx
Original file line number Diff line number Diff line change
@@ -1,44 +1,18 @@
"use client";

import { useEffect, useRef, useState } from "react";
import { useEffect, useRef } from "react";

import { eventStreamUrl } from "@/lib/api";
import type { RunEvent } from "@/lib/types";
import {
connectionLabel,
type RunEventConnectionStatus,
type StoredRunEvent,
} from "@/lib/run-event-connection";

interface Props {
runId: string;
onTerminal?: () => void;
events: StoredRunEvent[];
status: RunEventConnectionStatus;
}

type ConnectionStatus = "connecting" | "connected" | "reconnecting" | "closed";

interface StoredEvent extends RunEvent {
clientSeq: number;
}

const TERMINAL = new Set(["run.completed", "run.failed", "run.cancelled"]);

const EVENT_TYPES = [
"run.created",
"run.started",
"run.completed",
"run.failed",
"run.cancelled",
"step.started",
"step.updated",
"step.completed",
"step.failed",
"token.delta",
"message.created",
"tool_call.started",
"tool_call.completed",
"checkpoint.created",
"log",
] as const;

const INITIAL_RECONNECT_MS = 1_000;
const MAX_RECONNECT_MS = 30_000;

function formatEventData(type: string, data: Record<string, unknown>): string {
if (type === "checkpoint.created") {
const label = data.label;
Expand All @@ -54,121 +28,8 @@ function formatEventData(type: string, data: Record<string, unknown>): string {
return JSON.stringify(data);
}

function connectionLabel(status: ConnectionStatus): string {
switch (status) {
case "connecting":
return "connecting…";
case "connected":
return "live";
case "reconnecting":
return "reconnecting…";
case "closed":
return "closed";
}
}

export function EventStream({ runId, onTerminal }: Props) {
const [events, setEvents] = useState<StoredEvent[]>([]);
const [status, setStatus] = useState<ConnectionStatus>("connecting");
export function EventStream({ events, status }: Props) {
const bottomRef = useRef<HTMLDivElement>(null);
const onTerminalRef = useRef(onTerminal);
const seqRef = useRef(0);

onTerminalRef.current = onTerminal;

useEffect(() => {
seqRef.current = 0;
setEvents([]);
setStatus("connecting");

let cancelled = false;
let source: EventSource | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let reconnectAttempt = 0;
let terminal = false;

const clearReconnectTimer = () => {
if (reconnectTimer != null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
};

const closeSource = () => {
if (!source) return;
EVENT_TYPES.forEach((t) => source!.removeEventListener(t, onMessage));
source.close();
source = null;
};

const finishTerminal = () => {
terminal = true;
clearReconnectTimer();
closeSource();
if (!cancelled) setStatus("closed");
onTerminalRef.current?.();
};

const scheduleReconnect = () => {
if (cancelled || terminal) return;
setStatus("reconnecting");
const delay = Math.min(
INITIAL_RECONNECT_MS * 2 ** reconnectAttempt,
MAX_RECONNECT_MS,
);
reconnectAttempt += 1;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
connect();
}, delay);
};

const onMessage = (event: MessageEvent) => {
if (cancelled || terminal) return;
try {
const data: RunEvent = JSON.parse(event.data);
const clientSeq = ++seqRef.current;
setEvents((prev) => [...prev, { ...data, clientSeq }]);
if (TERMINAL.has(data.type)) {
finishTerminal();
}
} catch {
// ignore malformed events
}
};

const connect = () => {
if (cancelled || terminal) return;

closeSource();
setStatus(reconnectAttempt === 0 ? "connecting" : "reconnecting");

const next = new EventSource(eventStreamUrl(runId));
source = next;

next.addEventListener("open", () => {
if (cancelled || terminal) return;
reconnectAttempt = 0;
setStatus("connected");
});

EVENT_TYPES.forEach((t) => next.addEventListener(t, onMessage));

next.onerror = () => {
if (cancelled || terminal) return;
closeSource();
scheduleReconnect();
};
};

connect();

return () => {
cancelled = true;
clearReconnectTimer();
closeSource();
};
}, [runId]);

useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
Expand All @@ -195,31 +56,39 @@ export function EventStream({ runId, onTerminal }: Props) {
<div className="text-muted">
{status === "reconnecting"
? "Connection lost — retrying…"
: "Waiting for events…"}
: status === "closed"
? "Stream closed"
: "Waiting for events…"}
</div>
) : (
events.map((e) => (
<div key={e.clientSeq} className="flex gap-3">
<span className="text-muted shrink-0">
{new Date(e.at).toLocaleTimeString()}
</span>
<span
className={
e.type === "checkpoint.created"
? "text-warn shrink-0"
: "text-accent shrink-0"
}
>
{e.type}
</span>
<span className="text-muted truncate">
{formatEventData(e.type, e.data)}
</span>
</div>
events.map((event) => (
<EventStreamRow key={event.clientSeq} event={event} />
))
)}
<div ref={bottomRef} />
</div>
</div>
);
}

function EventStreamRow({ event }: { event: StoredRunEvent }) {
return (
<div className="flex gap-3">
<span className="text-muted shrink-0">
{new Date(event.at).toLocaleTimeString()}
</span>
<span
className={
event.type === "checkpoint.created"
? "text-warn shrink-0"
: "text-accent shrink-0"
}
>
{event.type}
</span>
<span className="text-muted truncate">
{formatEventData(event.type, event.data)}
</span>
</div>
);
}
Loading
Loading