diff --git a/frontend/app/runs/[id]/page.tsx b/frontend/app/runs/[id]/page.tsx index ddf53f4..940cc5c 100644 --- a/frontend/app/runs/[id]/page.tsx +++ b/frontend/app/runs/[id]/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { useMutation, useQueryClient } from "@tanstack/react-query"; import Link from "next/link"; import { use } from "react"; @@ -10,8 +10,9 @@ import { EventStream } from "@/components/EventStream"; import { StatusBadge } from "@/components/StatusBadge"; import { TokenCostSummary } from "@/components/TokenCostSummary"; import { checkpointsByStep } from "@/lib/checkpoints"; -import { formatCostUsd } from "@/lib/usage"; import { api } from "@/lib/api"; +import { useRunWithLiveUpdates } from "@/lib/useRunLiveSync"; +import { formatCostUsd } from "@/lib/usage"; interface PageProps { params: Promise<{ id: string }>; @@ -20,15 +21,7 @@ interface PageProps { export default function RunDetailPage({ params }: PageProps) { const { id } = use(params); const queryClient = useQueryClient(); - const run = useQuery({ - queryKey: ["run", id], - queryFn: () => api.getRun(id), - refetchInterval: (q) => { - const status = q.state.data?.status; - if (status === "running" || status === "pending") return 2_000; - return false; - }, - }); + const run = useRunWithLiveUpdates(id); const cancel = useMutation({ mutationFn: () => api.cancelRun(id), @@ -40,7 +33,6 @@ export default function RunDetailPage({ params }: PageProps) { return

Failed to load run: {String(run.error)}

; const r = run.data; - const isLive = r.status === "running" || r.status === "pending"; const stepCheckpoints = checkpointsByStep(r.steps, r.checkpoints); return ( @@ -60,7 +52,7 @@ export default function RunDetailPage({ params }: PageProps) {
- {isLive && ( + {run.isLive && (
); } diff --git a/frontend/app/runs/page.tsx b/frontend/app/runs/page.tsx index 64309ab..5866846 100644 --- a/frontend/app/runs/page.tsx +++ b/frontend/app/runs/page.tsx @@ -2,24 +2,37 @@ import { useQuery } from "@tanstack/react-query"; import Link from "next/link"; +import { useMemo } from "react"; import { StatusBadge } from "@/components/StatusBadge"; import { api } from "@/lib/api"; +import { + liveRunIdsFromRuns, + useRunsListLiveSync, +} from "@/lib/useRunLiveSync"; import { formatCostUsd, hasUsageMetrics } from "@/lib/usage"; export default function RunsPage() { const runs = useQuery({ queryKey: ["runs"], queryFn: api.listRuns, - refetchInterval: 3_000, }); + const liveRunIds = useMemo( + () => liveRunIdsFromRuns(runs.data), + [runs.data], + ); + + useRunsListLiveSync(liveRunIds); + return (

Runs

- auto-refreshing every 3s + {liveRunIds.length > 0 + ? `live via SSE (${liveRunIds.length} active)` + : "updates on refresh"}
diff --git a/frontend/components/EventStream.tsx b/frontend/components/EventStream.tsx index 3c748df..a7fb671 100644 --- a/frontend/components/EventStream.tsx +++ b/frontend/components/EventStream.tsx @@ -1,44 +1,18 @@ "use client"; -import { useEffect, useRef, useState } from "react"; +import { useEffect, useRef } from "react"; -import { eventStreamUrl } from "@/lib/api"; -import type { RunEvent } from "@/lib/types"; +import { + connectionLabel, + type RunEventConnectionStatus, + type StoredRunEvent, +} from "@/lib/run-event-connection"; interface Props { - runId: string; - onTerminal?: () => void; + events: StoredRunEvent[]; + status: RunEventConnectionStatus; } -type ConnectionStatus = "connecting" | "connected" | "reconnecting" | "closed"; - -interface StoredEvent extends RunEvent { - clientSeq: number; -} - -const TERMINAL = new Set(["run.completed", "run.failed", "run.cancelled"]); - -const EVENT_TYPES = [ - "run.created", - "run.started", - "run.completed", - "run.failed", - "run.cancelled", - "step.started", - "step.updated", - "step.completed", - "step.failed", - "token.delta", - "message.created", - "tool_call.started", - "tool_call.completed", - "checkpoint.created", - "log", -] as const; - -const INITIAL_RECONNECT_MS = 1_000; -const MAX_RECONNECT_MS = 30_000; - function formatEventData(type: string, data: Record): string { if (type === "checkpoint.created") { const label = data.label; @@ -54,121 +28,8 @@ function formatEventData(type: string, data: Record): string { return JSON.stringify(data); } -function connectionLabel(status: ConnectionStatus): string { - switch (status) { - case "connecting": - return "connecting…"; - case "connected": - return "live"; - case "reconnecting": - return "reconnecting…"; - case "closed": - return "closed"; - } -} - -export function EventStream({ runId, onTerminal }: Props) { - const [events, setEvents] = useState([]); - const [status, setStatus] = useState("connecting"); +export function EventStream({ events, status }: Props) { const bottomRef = useRef(null); - const onTerminalRef = useRef(onTerminal); - const seqRef = useRef(0); - - onTerminalRef.current = onTerminal; - - useEffect(() => { - seqRef.current = 0; - setEvents([]); - setStatus("connecting"); - - let cancelled = false; - let source: EventSource | null = null; - let reconnectTimer: ReturnType | null = null; - let reconnectAttempt = 0; - let terminal = false; - - const clearReconnectTimer = () => { - if (reconnectTimer != null) { - clearTimeout(reconnectTimer); - reconnectTimer = null; - } - }; - - const closeSource = () => { - if (!source) return; - EVENT_TYPES.forEach((t) => source!.removeEventListener(t, onMessage)); - source.close(); - source = null; - }; - - const finishTerminal = () => { - terminal = true; - clearReconnectTimer(); - closeSource(); - if (!cancelled) setStatus("closed"); - onTerminalRef.current?.(); - }; - - const scheduleReconnect = () => { - if (cancelled || terminal) return; - setStatus("reconnecting"); - const delay = Math.min( - INITIAL_RECONNECT_MS * 2 ** reconnectAttempt, - MAX_RECONNECT_MS, - ); - reconnectAttempt += 1; - reconnectTimer = setTimeout(() => { - reconnectTimer = null; - connect(); - }, delay); - }; - - const onMessage = (event: MessageEvent) => { - if (cancelled || terminal) return; - try { - const data: RunEvent = JSON.parse(event.data); - const clientSeq = ++seqRef.current; - setEvents((prev) => [...prev, { ...data, clientSeq }]); - if (TERMINAL.has(data.type)) { - finishTerminal(); - } - } catch { - // ignore malformed events - } - }; - - const connect = () => { - if (cancelled || terminal) return; - - closeSource(); - setStatus(reconnectAttempt === 0 ? "connecting" : "reconnecting"); - - const next = new EventSource(eventStreamUrl(runId)); - source = next; - - next.addEventListener("open", () => { - if (cancelled || terminal) return; - reconnectAttempt = 0; - setStatus("connected"); - }); - - EVENT_TYPES.forEach((t) => next.addEventListener(t, onMessage)); - - next.onerror = () => { - if (cancelled || terminal) return; - closeSource(); - scheduleReconnect(); - }; - }; - - connect(); - - return () => { - cancelled = true; - clearReconnectTimer(); - closeSource(); - }; - }, [runId]); useEffect(() => { bottomRef.current?.scrollIntoView({ behavior: "smooth" }); @@ -195,27 +56,13 @@ export function EventStream({ runId, onTerminal }: Props) {
{status === "reconnecting" ? "Connection lost — retrying…" - : "Waiting for events…"} + : status === "closed" + ? "Stream closed" + : "Waiting for events…"}
) : ( - events.map((e) => ( -
- - {new Date(e.at).toLocaleTimeString()} - - - {e.type} - - - {formatEventData(e.type, e.data)} - -
+ events.map((event) => ( + )) )}
@@ -223,3 +70,25 @@ export function EventStream({ runId, onTerminal }: Props) {
); } + +function EventStreamRow({ event }: { event: StoredRunEvent }) { + return ( +
+ + {new Date(event.at).toLocaleTimeString()} + + + {event.type} + + + {formatEventData(event.type, event.data)} + +
+ ); +} diff --git a/frontend/lib/run-event-connection.ts b/frontend/lib/run-event-connection.ts new file mode 100644 index 0000000..c551132 --- /dev/null +++ b/frontend/lib/run-event-connection.ts @@ -0,0 +1,138 @@ +import { eventStreamUrl } from "./api"; +import { + isTerminalRunEvent, + RUN_EVENT_TYPES, +} from "./run-events"; +import type { RunEvent } from "./types"; + +export type RunEventConnectionStatus = + | "connecting" + | "connected" + | "reconnecting" + | "closed"; + +export interface StoredRunEvent extends RunEvent { + clientSeq: number; +} + +export interface RunEventConnectionHandlers { + onEvent?: (event: RunEvent, clientSeq: number) => void; + onTerminal?: (event: RunEvent) => void; + onStatusChange?: (status: RunEventConnectionStatus) => void; +} + +const INITIAL_RECONNECT_MS = 1_000; +const MAX_RECONNECT_MS = 30_000; + +export function createRunEventConnection( + runId: string, + handlers: RunEventConnectionHandlers, +): { close: () => void } { + let cancelled = false; + let source: EventSource | null = null; + let reconnectTimer: ReturnType | null = null; + let reconnectAttempt = 0; + let terminal = false; + let clientSeq = 0; + + const setStatus = (status: RunEventConnectionStatus) => { + if (!cancelled) handlers.onStatusChange?.(status); + }; + + const clearReconnectTimer = () => { + if (reconnectTimer != null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + }; + + const closeSource = () => { + if (!source) return; + RUN_EVENT_TYPES.forEach((type) => source!.removeEventListener(type, onMessage)); + source.close(); + source = null; + }; + + const finishTerminal = (event: RunEvent) => { + terminal = true; + clearReconnectTimer(); + closeSource(); + setStatus("closed"); + onTerminalRef.current?.(event); + }; + + const scheduleReconnect = () => { + if (cancelled || terminal) return; + setStatus("reconnecting"); + const delay = Math.min( + INITIAL_RECONNECT_MS * 2 ** reconnectAttempt, + MAX_RECONNECT_MS, + ); + reconnectAttempt += 1; + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + connect(); + }, delay); + }; + + const onMessage = (event: MessageEvent) => { + if (cancelled || terminal) return; + try { + const data: RunEvent = JSON.parse(event.data); + clientSeq += 1; + onEventRef.current?.(data, clientSeq); + if (isTerminalRunEvent(data.type)) { + finishTerminal(data); + } + } catch { + // ignore malformed events + } + }; + + const connect = () => { + if (cancelled || terminal) return; + + closeSource(); + setStatus(reconnectAttempt === 0 ? "connecting" : "reconnecting"); + + const next = new EventSource(eventStreamUrl(runId)); + source = next; + + next.addEventListener("open", () => { + if (cancelled || terminal) return; + reconnectAttempt = 0; + setStatus("connected"); + }); + + RUN_EVENT_TYPES.forEach((type) => next.addEventListener(type, onMessage)); + + next.onerror = () => { + if (cancelled || terminal) return; + closeSource(); + scheduleReconnect(); + }; + }; + + connect(); + + return { + close: () => { + cancelled = true; + clearReconnectTimer(); + closeSource(); + }, + }; +} + +export function connectionLabel(status: RunEventConnectionStatus): string { + switch (status) { + case "connecting": + return "connecting…"; + case "connected": + return "live"; + case "reconnecting": + return "reconnecting…"; + case "closed": + return "closed"; + } +} diff --git a/frontend/lib/run-events.ts b/frontend/lib/run-events.ts new file mode 100644 index 0000000..9e2fbbc --- /dev/null +++ b/frontend/lib/run-events.ts @@ -0,0 +1,386 @@ +import type { Checkpoint, Message, Run, RunEvent, RunStatus, RunUsage, Step, ToolCall } from "./types"; + +export const RUN_EVENT_TYPES = [ + "run.created", + "run.started", + "run.completed", + "run.failed", + "run.cancelled", + "run.waiting_human", + "step.started", + "step.updated", + "step.completed", + "step.failed", + "token.delta", + "message.created", + "tool_call.started", + "tool_call.completed", + "checkpoint.created", + "log", +] as const; + +export const TERMINAL_RUN_EVENTS = new Set([ + "run.completed", + "run.failed", + "run.cancelled", +]); + +const LIVE_RUN_STATUSES = new Set(["pending", "running"]); + +export function isLiveRunStatus(status: RunStatus): boolean { + return LIVE_RUN_STATUSES.has(status); +} + +export function isTerminalRunEvent(type: string): boolean { + return TERMINAL_RUN_EVENTS.has(type); +} + +/** Events that may leave client-side placeholders out of sync with the API. */ +export function shouldReconcileRun(type: string): boolean { + return type !== "token.delta" && type !== "log"; +} + +function asRecord(value: unknown): Record { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : {}; +} + +function asOptionalRecord(value: unknown): Record | null { + if (value == null) return null; + return asRecord(value); +} + +function asUsage(value: unknown): RunUsage | null { + const raw = asRecord(value); + if ( + typeof raw.tokens_in !== "number" && + typeof raw.tokens_out !== "number" && + typeof raw.cost_usd !== "number" + ) { + return null; + } + return { + tokens_in: typeof raw.tokens_in === "number" ? raw.tokens_in : 0, + tokens_out: typeof raw.tokens_out === "number" ? raw.tokens_out : 0, + cost_usd: typeof raw.cost_usd === "number" ? raw.cost_usd : 0, + latency_ms: typeof raw.latency_ms === "number" ? raw.latency_ms : null, + }; +} + +export function aggregateUsageFromSteps(steps: Step[]): RunUsage { + let tokensIn = 0; + let tokensOut = 0; + let costUsd = 0; + let latencyMs = 0; + let hasLatency = false; + + for (const step of steps) { + tokensIn += step.tokens_in ?? 0; + tokensOut += step.tokens_out ?? 0; + costUsd += step.cost_usd ?? 0; + if (step.latency_ms != null) { + latencyMs += step.latency_ms; + hasLatency = true; + } + } + + return { + tokens_in: tokensIn, + tokens_out: tokensOut, + cost_usd: costUsd, + latency_ms: hasLatency ? latencyMs : null, + }; +} + +function findStep(steps: Step[], index: number): Step | undefined { + return steps.find((step) => step.index === index); +} + +function upsertStep(steps: Step[], step: Step): Step[] { + const next = steps.some((item) => item.index === step.index) + ? steps.map((item) => (item.index === step.index ? step : item)) + : [...steps, step]; + return next.sort((a, b) => a.index - b.index); +} + +function patchStep( + steps: Step[], + index: number, + patch: Partial, + at: string, +): Step[] { + const existing = findStep(steps, index); + if (!existing) return steps; + return upsertStep(steps, { + ...existing, + ...patch, + updated_at: at, + }); +} + +function appendMessage(messages: Message[], data: Record, at: string): Message[] { + const index = + typeof data.index === "number" ? data.index : messages.length > 0 + ? Math.max(...messages.map((message) => message.index)) + 1 + : 0; + + if (messages.some((message) => message.index === index)) { + return messages; + } + + const role = data.role; + const message: Message = { + id: `sse-msg-${index}`, + index, + role: + role === "system" || + role === "user" || + role === "assistant" || + role === "tool" + ? role + : "assistant", + name: typeof data.name === "string" ? data.name : null, + content: typeof data.content === "string" ? data.content : "", + tool_call_id: + typeof data.tool_call_id === "string" ? data.tool_call_id : null, + extra: asRecord(data.extra), + created_at: at, + }; + + return [...messages, message].sort((a, b) => a.index - b.index); +} + +function appendToolCall(step: Step, data: Record): ToolCall { + return { + id: `sse-tc-${step.index}-${step.tool_calls.length}`, + name: typeof data.name === "string" ? data.name : "tool", + arguments: asRecord(data.arguments), + result: null, + error: null, + latency_ms: null, + }; +} + +function patchToolCall( + step: Step, + data: Record, + at: string, +): Step { + const name = typeof data.name === "string" ? data.name : ""; + const toolCalls = [...step.tool_calls]; + let index = toolCalls.findIndex((call) => call.name === name); + if (index === -1) { + toolCalls.push(appendToolCall(step, data)); + index = toolCalls.length - 1; + } + + toolCalls[index] = { + ...toolCalls[index], + result: + "result" in data + ? (data.result as Record | null) + : toolCalls[index].result, + error: + typeof data.error === "string" ? data.error : toolCalls[index].error, + latency_ms: + typeof data.latency_ms === "number" + ? data.latency_ms + : toolCalls[index].latency_ms, + }; + + return { ...step, tool_calls: toolCalls, updated_at: at }; +} + +function appendCheckpoint( + checkpoints: Checkpoint[], + data: Record, + at: string, +): Checkpoint[] { + const index = + typeof data.index === "number" + ? data.index + : checkpoints.length > 0 + ? Math.max(...checkpoints.map((cp) => cp.index)) + 1 + : 0; + + if (checkpoints.some((cp) => cp.index === index)) { + return checkpoints; + } + + return [ + ...checkpoints, + { + id: `sse-cp-${index}`, + index, + label: typeof data.label === "string" ? data.label : null, + created_at: at, + }, + ].sort((a, b) => a.index - b.index); +} + +/** Apply a single SSE event to a full run snapshot for optimistic UI updates. */ +export function applyRunEvent(run: Run, event: RunEvent): Run { + const { type, data, at } = event; + let next: Run = { ...run, updated_at: at }; + + switch (type) { + case "run.started": + return { ...next, status: "running" }; + case "run.completed": { + const usage = asUsage(data.usage); + return { + ...next, + status: "succeeded", + output: "output" in data ? asOptionalRecord(data.output) : run.output, + error: null, + usage: usage ?? next.usage, + }; + } + case "run.failed": + return { + ...next, + status: "failed", + error: typeof data.error === "string" ? data.error : run.error, + }; + case "run.cancelled": + return { + ...next, + status: "cancelled", + error: + typeof data.error === "string" ? data.error : (run.error ?? "cancelled"), + }; + case "run.waiting_human": + return { + ...next, + status: "waiting_human", + output: "output" in data ? asOptionalRecord(data.output) : run.output, + }; + case "step.started": { + const index = data.index; + if (typeof index !== "number") return next; + const step: Step = { + id: findStep(run.steps, index)?.id ?? `sse-step-${index}`, + index, + node: typeof data.node === "string" ? data.node : "", + status: "running", + input: asRecord(data.input), + output: findStep(run.steps, index)?.output ?? null, + error: findStep(run.steps, index)?.error ?? null, + latency_ms: findStep(run.steps, index)?.latency_ms ?? null, + tokens_in: findStep(run.steps, index)?.tokens_in ?? null, + tokens_out: findStep(run.steps, index)?.tokens_out ?? null, + cost_usd: findStep(run.steps, index)?.cost_usd ?? null, + tool_calls: findStep(run.steps, index)?.tool_calls ?? [], + created_at: findStep(run.steps, index)?.created_at ?? at, + updated_at: at, + }; + const steps = upsertStep(run.steps, step); + return { + ...next, + steps, + usage: aggregateUsageFromSteps(steps), + }; + } + case "step.updated": + case "step.completed": + case "step.failed": { + const index = data.index; + if (typeof index !== "number") return next; + const existing = findStep(run.steps, index); + if (!existing) return next; + + const status: RunStatus = + type === "step.failed" + ? "failed" + : type === "step.completed" + ? "succeeded" + : existing.status; + + const steps = patchStep(run.steps, index, { + status, + output: + type === "step.completed" && "output" in data + ? asOptionalRecord(data.output) + : existing.output, + error: + type === "step.failed" && typeof data.error === "string" + ? data.error + : existing.error, + latency_ms: + typeof data.latency_ms === "number" + ? data.latency_ms + : existing.latency_ms, + tokens_in: + typeof data.tokens_in === "number" + ? data.tokens_in + : existing.tokens_in, + tokens_out: + typeof data.tokens_out === "number" + ? data.tokens_out + : existing.tokens_out, + cost_usd: + typeof data.cost_usd === "number" ? data.cost_usd : existing.cost_usd, + }, at); + + return { + ...next, + steps, + usage: aggregateUsageFromSteps(steps), + }; + } + case "message.created": + return { + ...next, + messages: appendMessage(run.messages, data, at), + }; + case "tool_call.started": { + const stepIndex = data.step_index; + if (typeof stepIndex !== "number") return next; + const step = findStep(run.steps, stepIndex); + if (!step) return next; + const updated = patchToolCall(step, data, at); + return { + ...next, + steps: upsertStep(run.steps, updated), + }; + } + case "tool_call.completed": { + const stepIndex = data.step_index; + if (typeof stepIndex !== "number") return next; + const step = findStep(run.steps, stepIndex); + if (!step) return next; + const updated = patchToolCall(step, data, at); + return { + ...next, + steps: upsertStep(run.steps, updated), + }; + } + case "checkpoint.created": + return { + ...next, + checkpoints: appendCheckpoint(run.checkpoints, data, at), + }; + default: + return next; + } +} + +/** Patch list rows with run-level fields only (avoid inflating list payloads). */ +export function patchRunInList(runs: Run[], event: RunEvent): Run[] { + const index = runs.findIndex((run) => run.id === event.run_id); + if (index === -1) return runs; + + const current = runs[index]; + const patched = applyRunEvent(current, event); + const next = [...runs]; + next[index] = { + ...current, + status: patched.status, + output: patched.output, + error: patched.error, + usage: patched.usage, + updated_at: patched.updated_at, + }; + return next; +} diff --git a/frontend/lib/useDebouncedCallback.ts b/frontend/lib/useDebouncedCallback.ts new file mode 100644 index 0000000..ae8503d --- /dev/null +++ b/frontend/lib/useDebouncedCallback.ts @@ -0,0 +1,39 @@ +import { useEffect, useMemo, useRef } from "react"; + +export function useDebouncedCallback void>( + callback: T, + delayMs: number, +): T & { cancel: () => void } { + const callbackRef = useRef(callback); + const timerRef = useRef | null>(null); + + callbackRef.current = callback; + + const debounced = useMemo(() => { + const fn = ((...args: Parameters) => { + if (timerRef.current != null) clearTimeout(timerRef.current); + timerRef.current = setTimeout(() => { + timerRef.current = null; + callbackRef.current(...args); + }, delayMs); + }) as T & { cancel: () => void }; + + fn.cancel = () => { + if (timerRef.current != null) { + clearTimeout(timerRef.current); + timerRef.current = null; + } + }; + + return fn; + }, [delayMs]); + + useEffect( + () => () => { + debounced.cancel(); + }, + [debounced], + ); + + return debounced; +} diff --git a/frontend/lib/useRunEventSource.ts b/frontend/lib/useRunEventSource.ts new file mode 100644 index 0000000..e94cbf7 --- /dev/null +++ b/frontend/lib/useRunEventSource.ts @@ -0,0 +1,103 @@ +"use client"; + +import { useEffect, useRef, useState } from "react"; + +import { + connectionLabel, + createRunEventConnection, + type RunEventConnectionStatus, + type StoredRunEvent, +} from "@/lib/run-event-connection"; +import type { RunEvent } from "@/lib/types"; + +export { connectionLabel }; +export type { RunEventConnectionStatus, StoredRunEvent }; + +interface UseRunEventSourceOptions { + runId: string; + enabled?: boolean; + onEvent?: (event: RunEvent) => void; + onTerminal?: (event: RunEvent) => void; +} + +export function useRunEventSource({ + runId, + enabled = true, + onEvent, + onTerminal, +}: UseRunEventSourceOptions) { + const [events, setEvents] = useState([]); + const [status, setStatus] = useState( + enabled ? "connecting" : "closed", + ); + const onEventRef = useRef(onEvent); + const onTerminalRef = useRef(onTerminal); + + onEventRef.current = onEvent; + onTerminalRef.current = onTerminal; + + useEffect(() => { + if (!enabled) { + setStatus("closed"); + return; + } + + setEvents([]); + setStatus("connecting"); + + const connection = createRunEventConnection(runId, { + onStatusChange: setStatus, + onEvent: (event, clientSeq) => { + setEvents((prev) => [...prev, { ...event, clientSeq }]); + onEventRef.current?.(event); + }, + onTerminal: (event) => { + onTerminalRef.current?.(event); + }, + }); + + return () => connection.close(); + }, [runId, enabled]); + + return { events, status }; +} + +interface UseMultiRunEventSourceOptions { + runIds: string[]; + onEvent?: (event: RunEvent) => void; + onTerminal?: (event: RunEvent) => void; +} + +export function useMultiRunEventSource({ + runIds, + onEvent, + onTerminal, +}: UseMultiRunEventSourceOptions) { + const onEventRef = useRef(onEvent); + const onTerminalRef = useRef(onTerminal); + + onEventRef.current = onEvent; + onTerminalRef.current = onTerminal; + + const runKey = runIds.slice().sort().join(","); + + useEffect(() => { + if (!runKey) return; + + const ids = runKey.split(","); + const connections = ids.map((runId) => + createRunEventConnection(runId, { + onEvent: (event) => { + onEventRef.current?.(event); + }, + onTerminal: (event) => { + onTerminalRef.current?.(event); + }, + }), + ); + + return () => { + connections.forEach((connection) => connection.close()); + }; + }, [runKey]); +} diff --git a/frontend/lib/useRunLiveSync.ts b/frontend/lib/useRunLiveSync.ts new file mode 100644 index 0000000..adf26f6 --- /dev/null +++ b/frontend/lib/useRunLiveSync.ts @@ -0,0 +1,93 @@ +"use client"; + +import { useQuery, useQueryClient } from "@tanstack/react-query"; + +import { api } from "@/lib/api"; +import { + applyRunEvent, + isLiveRunStatus, + patchRunInList, + shouldReconcileRun, +} from "@/lib/run-events"; +import type { Run } from "@/lib/types"; +import { useDebouncedCallback } from "@/lib/useDebouncedCallback"; +import { + useMultiRunEventSource, + useRunEventSource, +} from "@/lib/useRunEventSource"; + +const RECONCILE_DELAY_MS = 1_000; + +export function useRunsListLiveSync(liveRunIds: string[]) { + const queryClient = useQueryClient(); + + const reconcileRuns = useDebouncedCallback(() => { + queryClient.invalidateQueries({ queryKey: ["runs"] }); + }, RECONCILE_DELAY_MS); + + useMultiRunEventSource({ + runIds: liveRunIds, + onEvent: (event) => { + queryClient.setQueryData(["runs"], (current) => + current ? patchRunInList(current, event) : current, + ); + if (shouldReconcileRun(event.type)) { + reconcileRuns(); + } + }, + onTerminal: () => { + reconcileRuns.cancel(); + queryClient.invalidateQueries({ queryKey: ["runs"] }); + }, + }); +} + +export function useRunWithLiveUpdates(runId: string) { + const queryClient = useQueryClient(); + + const run = useQuery({ + queryKey: ["run", runId], + queryFn: () => api.getRun(runId), + }); + + const enabled = run.data != null && isLiveRunStatus(run.data.status); + + const reconcileRun = useDebouncedCallback(() => { + queryClient.invalidateQueries({ queryKey: ["run", runId] }); + }, RECONCILE_DELAY_MS); + + const stream = useRunEventSource({ + runId, + enabled, + onEvent: (event) => { + queryClient.setQueryData(["run", runId], (current) => + current ? applyRunEvent(current, event) : current, + ); + queryClient.setQueryData(["runs"], (current) => + current ? patchRunInList(current, event) : current, + ); + if (shouldReconcileRun(event.type)) { + reconcileRun(); + } + }, + onTerminal: () => { + reconcileRun.cancel(); + queryClient.invalidateQueries({ queryKey: ["run", runId] }); + queryClient.invalidateQueries({ queryKey: ["runs"] }); + }, + }); + + return { + ...run, + events: stream.events, + streamStatus: stream.status, + isLive: enabled, + }; +} + +export function liveRunIdsFromRuns(runs: Run[] | undefined): string[] { + if (!runs) return []; + return runs + .filter((run) => isLiveRunStatus(run.status)) + .map((run) => run.id); +}