From b68a6ed12c024f98f601915e18c5b00d8bf149e5 Mon Sep 17 00:00:00 2001 From: Tale Agent Date: Mon, 22 Jun 2026 02:27:07 +0000 Subject: [PATCH] fix(sandbox): stdout accumulation avoids mid-stream window after kubelet log rotation (#1850) --- .../backend/kubernetes/k8s-backend.test.ts | 98 ++++++++++++++++++- .../src/backend/kubernetes/k8s-backend.ts | 35 ++++--- 2 files changed, 117 insertions(+), 16 deletions(-) diff --git a/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts b/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts index 04d957ed7..7846dd1c3 100644 --- a/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts +++ b/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts @@ -22,6 +22,7 @@ import { staleLifetimeCutoffMs, } from './k8s-backend.ts'; import { podNameFor } from './k8s-pod-spec.ts'; +import { formatResultLine } from './k8s-protocol.ts'; const cfg: SpawnerConfig = { backend: 'kubernetes', @@ -227,7 +228,7 @@ function stubClient(behavior: { createSecret?: () => Promise; createPod?: () => Promise; readPod?: () => Promise; - readLog?: () => Promise; + readLog?: (p: { container: string }) => Promise; }): { core: CoreV1Api; namespace: string; calls: StubCalls } { const calls: StubCalls = { deletes: [], replaces: 0 }; // oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- test stub @@ -238,7 +239,8 @@ function stubClient(behavior: { readNamespacedPod: behavior.readPod ?? (() => Promise.reject(Object.assign(new Error('404'), { code: 404 }))), - readNamespacedPodLog: behavior.readLog ?? (() => Promise.resolve('')), + readNamespacedPodLog: (p: { container: string }) => + behavior.readLog?.(p) ?? Promise.resolve(''), replaceNamespacedSecret: () => { calls.replaces += 1; return Promise.resolve({}); @@ -334,3 +336,95 @@ describe('duplicate-executionId safety', () => { ); }); }); + +// ---- stdout kubelet log rotation ------------------------------------------ + +describe('stdout log-rotation pinning', () => { + test('kubelet log rotation: canonical stdout is the pre-rotation head, not the new-file window', async () => { + // Runner emits PRE then kubelet rotates — the new file only contains + // ROTATED (shorter). Without the fix the final readPodLog returns ROTATED + // (a mid-stream window). With the fix the spawner-side accumulation + // retains PRE_HEAD (the deterministic head of the original log). + const PRE_HEAD = 'pre-rotation-head\n'; // 19 chars + const ROTATED = 'r\n'; // 2 chars — shorter, triggers shrink detection + + const harvestLog = formatResultLine({ + exitCode: 0, + stderr: '', + stderrTruncated: false, + outputFiles: [], + truncatedFiles: 0, + uploadStats: { attempted: 0, succeeded: 0, failures: [] }, + quotaExhausted: false, + uploadFailed: false, + reportFailed: false, + readFailed: false, + stageMs: 0, + harvestMs: 0, + uploadMs: 0, + }); + + let runnerLogIdx = 0; + const runnerLogs = [PRE_HEAD, ROTATED]; + + let podCallIdx = 0; + + // oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- test stub + const client = { + namespace: 'tale-sandbox', + core: { + createNamespacedSecret: () => Promise.resolve({}), + createNamespacedPod: () => Promise.resolve({}), + readNamespacedPod: () => { + podCallIdx += 1; + if (podCallIdx === 1) { + // waitForRunnerStart: runner container is Running + return Promise.resolve({ + metadata: {}, + status: { + containerStatuses: [ + { + name: 'runner', + state: { running: { startedAt: new Date() } }, + }, + ], + }, + } as V1Pod); + } + // Main loop: pod Succeeded with harvest terminated + return Promise.resolve({ + metadata: {}, + status: { + phase: 'Succeeded', + containerStatuses: [ + { name: 'harvest', state: { terminated: { exitCode: 0 } } }, + ], + }, + } as V1Pod); + }, + readNamespacedPodLog: ({ container }: { container: string }) => { + if (container === 'harvest') return Promise.resolve(harvestLog); + // Runner: serve logs in sequence; cap at last entry + const log = + runnerLogs[runnerLogIdx] ?? runnerLogs[runnerLogs.length - 1]; + runnerLogIdx += 1; + return Promise.resolve(log ?? ''); + }, + replaceNamespacedSecret: () => Promise.resolve({}), + deleteNamespacedPod: () => Promise.resolve({}), + deleteNamespacedSecret: () => Promise.resolve({}), + listNamespacedPod: () => Promise.resolve({ items: [] }), + listNamespacedSecret: () => Promise.resolve({ items: [] }), + } as unknown as CoreV1Api, + }; + + const backend = new KubernetesBackend(cfg, client); + const res = await backend.execute(cfg, req, execOpts()); + + expect(res.status).toBe('completed'); + // The canonical stdout must be the pre-rotation head, not the rotated window. + expect(Buffer.from(res.stdoutBase64, 'base64').toString()).toBe('pre-rotation-head\n'); + // The rotation must be flagged as truncation. + expect(res.truncated.stdout).toBe(true); + }); +}); diff --git a/services/sandbox/src/backend/kubernetes/k8s-backend.ts b/services/sandbox/src/backend/kubernetes/k8s-backend.ts index 18d49563d..f8f7c9281 100644 --- a/services/sandbox/src/backend/kubernetes/k8s-backend.ts +++ b/services/sandbox/src/backend/kubernetes/k8s-backend.ts @@ -427,6 +427,12 @@ export class KubernetesBackend implements ExecutionBackend { let lastLogLen = 0; let logShrunk = false; let loggedPollError = false; + // Spawner-side canonical stdout buffer: accumulated from polled deltas + // so that a kubelet log rotation never gives us a mid-stream window. + // After rotation logs.length resets to 0; a final readPodLog would then + // return only the new file's content — the head of the original stream + // is gone. By accumulating here we always retain the deterministic head. + let logBuf = ''; const pollRunnerStdout = async (): Promise => { let logs: string; try { @@ -446,12 +452,18 @@ export class KubernetesBackend implements ExecutionBackend { return; } if (logs.length > lastLogLen) { - scanner.onStdoutChunk?.(Buffer.from(logs.slice(lastLogLen), 'utf8')); + const delta = logs.slice(lastLogLen); + scanner.onStdoutChunk?.(Buffer.from(delta, 'utf8')); lastLogLen = logs.length; + if (Buffer.byteLength(logBuf, 'utf8') < cfg.stdoutMaxBytes) { + ({ text: logBuf } = capText(logBuf + delta, cfg.stdoutMaxBytes)); + } } else if (logs.length < lastLogLen) { - // The kubelet rotated the container log out from under us — the - // canonical head is gone, so the final read is a partial window. + // Kubelet rotated the container log out from under us — the + // canonical head is already captured in logBuf. Reset the length + // cursor so we continue accumulating from the new file's beginning. logShrunk = true; + lastLogLen = 0; } }; @@ -540,19 +552,14 @@ export class KubernetesBackend implements ExecutionBackend { await sleep(POLL_INTERVAL_MS, opts.signal); } - // Final stdout read (the runner may have emitted more between the last - // poll and exit) → feed the residual to the scanner, then drain it. The - // canonical buffer is the full (capped) runner log. + // Final stdout poll (the runner may have emitted more between the last + // loop iteration and exit) → feed the residual to the scanner, then + // drain it. Use the spawner-side accumulation (logBuf) as the canonical + // stdout — it always holds the deterministic head of the stream even + // across kubelet log rotations. await pollRunnerStdout(); scanner.finalize(); - let stdout = ''; - try { - stdout = await readPodLog(this.client, podName, 'runner', { - limitBytes: cfg.stdoutMaxBytes, - }); - } catch (err) { - console.warn('[sandbox.k8s] final runner log read failed:', err); - } + const stdout = logBuf; const stdoutStreamTruncated = Buffer.byteLength(stdout, 'utf8') >= cfg.stdoutMaxBytes || logShrunk;