From ed300bbb56074ff578cafd905ff5c3734747ada6 Mon Sep 17 00:00:00 2001 From: Agent Date: Sun, 21 Jun 2026 13:35:38 +0000 Subject: [PATCH] fix(sandbox): accumulate canonical stdout from polled deltas, not final re-read (#1850) Kubelet rotates container logs at containerLogMaxSize (default 10Mi) and readNamespacedPodLog serves only the current file. A final re-read after rotation returned a mid-stream window instead of the deterministic first-N bytes that docker's drainAndCap produces. Fix: maintain canonicalChunks/canonicalByteCount accumulators inside pollRunnerStdout. Each delta is appended up to stdoutMaxBytes. On rotation (logs.length < lastLogLen), reset lastLogLen=0 so post-rotation bytes are picked up from byte 0 of the new file; logShrunk=true ensures truncated.stdout is set regardless. The final readPodLog call is replaced by Buffer.concat(canonicalChunks).toString('utf8'). Two regression tests added: (1) pre-rotation head bytes are preserved when rotation is detected in the final poll, (2) post-rotation deltas are appended correctly when the cap has not yet been reached. --- .../backend/kubernetes/k8s-backend.test.ts | 133 +++++++++++++++++- .../src/backend/kubernetes/k8s-backend.ts | 48 +++++-- 2 files changed, 165 insertions(+), 16 deletions(-) diff --git a/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts b/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts index 04d957ed7..42b94e930 100644 --- a/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts +++ b/services/sandbox/src/backend/kubernetes/k8s-backend.test.ts @@ -22,6 +22,7 @@ import { staleLifetimeCutoffMs, } from './k8s-backend.ts'; import { podNameFor } from './k8s-pod-spec.ts'; +import { formatResultLine } from './k8s-protocol.ts'; const cfg: SpawnerConfig = { backend: 'kubernetes', @@ -227,7 +228,7 @@ function stubClient(behavior: { createSecret?: () => Promise; createPod?: () => Promise; readPod?: () => Promise; - readLog?: () => Promise; + readLog?: (params?: { container?: string }) => Promise; }): { core: CoreV1Api; namespace: string; calls: StubCalls } { const calls: StubCalls = { deletes: [], replaces: 0 }; // oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- test stub @@ -334,3 +335,133 @@ describe('duplicate-executionId safety', () => { ); }); }); + +// ---- log rotation: canonical stdout from polled deltas -------------------- +// +// Verifies that when kubelet rotates the container log (the log byte-count +// drops between polls), the canonical stdout is the head bytes accumulated +// BEFORE rotation — not the mid-stream window the rotated file starts at. + +describe('stdout accumulation across kubelet log rotation', () => { + const harvestResultLine = formatResultLine({ + exitCode: 0, + stderr: '', + stderrTruncated: false, + outputFiles: [], + truncatedFiles: 0, + uploadStats: { attempted: 0, succeeded: 0, failures: [] }, + quotaExhausted: false, + uploadFailed: false, + reportFailed: false, + readFailed: false, + stageMs: 10, + harvestMs: 10, + uploadMs: 5, + }); + + // Pod fixture: runner running (used by waitForRunnerStart). + function runningPod(): V1Pod { + // oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- partial V1Pod fixture + return { + metadata: {}, + status: { + containerStatuses: [ + { name: 'runner', state: { running: { startedAt: new Date() } } }, + { name: 'harvest', state: { running: { startedAt: new Date() } } }, + ], + }, + } as V1Pod; + } + + // Pod fixture: harvest terminated → main loop breaks. + function terminatedPod(): V1Pod { + // oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- partial V1Pod fixture + return { + metadata: {}, + status: { + phase: 'Succeeded', + containerStatuses: [ + { name: 'runner', state: { terminated: { exitCode: 0 } } }, + { name: 'harvest', state: { terminated: { exitCode: 0 } } }, + ], + }, + } as V1Pod; + } + + test('pre-rotation head bytes are preserved; truncated.stdout is set', async () => { + // Runner emits 'AAAAA' (polled in main loop), then kubelet rotates so the + // log shrinks to 'BBB' (detected in the final poll after the loop exits). + // With the old code the final readPodLog returned 'BBB' as the canonical + // stdout. With the fix the canonical is 'AAAAA' (what was accumulated + // before the rotation), and truncated.stdout is set because logShrunk=true. + let readPodCount = 0; + let runnerLogCount = 0; + const stub = stubClient({ + readPod: () => { + readPodCount += 1; + // waitForRunnerStart: first call sees runner running. + // Main loop: second call sees harvest terminated → immediate break. + return Promise.resolve( + readPodCount <= 1 ? runningPod() : terminatedPod(), + ); + }, + readLog: (p) => { + if (p?.container === 'harvest') { + return Promise.resolve(harvestResultLine); + } + // Runner log sequence: empty → pre-rotation → rotation (shorter, final poll) + runnerLogCount += 1; + if (runnerLogCount === 1) return Promise.resolve('AAAAA'); + // Final poll after loop: kubelet has rotated — shorter than 'AAAAA'. + return Promise.resolve('BBB'); + }, + }); + const backend = new KubernetesBackend(cfg, stub); + const res = await backend.execute(cfg, req, execOpts()); + + expect(res.status).toBe('completed'); + const stdout = Buffer.from(res.stdoutBase64, 'base64').toString('utf8'); + // Canonical stdout must be the pre-rotation head, not the rotated window. + expect(stdout).toBe('AAAAA'); + expect(res.truncated.stdout).toBe(true); + }); + + test('post-rotation deltas are appended when cap has not been reached', async () => { + // Runner emits 6 bytes pre-rotation, then log rotates, then 4 more bytes + // appear in the new file. All 10 bytes fit under stdoutMaxBytes so both + // chunks should appear in the canonical output. + let readPodCount = 0; + let runnerLogCount = 0; + const stub = stubClient({ + readPod: () => { + readPodCount += 1; + // waitForRunnerStart: first call → runner running. + // Main loop: second call → still running (so post-rotation poll fires). + // Third call: harvest terminated. + if (readPodCount <= 1) return Promise.resolve(runningPod()); + if (readPodCount === 2) return Promise.resolve(runningPod()); + return Promise.resolve(terminatedPod()); + }, + readLog: (p) => { + if (p?.container === 'harvest') { + return Promise.resolve(harvestResultLine); + } + runnerLogCount += 1; + // Call 1 (main-loop poll 1): pre-rotation content. + if (runnerLogCount === 1) return Promise.resolve('AAAAAA'); // 6 bytes + // Call 2 (main-loop poll 2): rotation — new file starts at 0. + if (runnerLogCount === 2) return Promise.resolve('BBBB'); // 4 bytes < 6 → rotation + // Call 3 (final poll): new file grew a bit more. + return Promise.resolve('BBBBBBBB'); // 8 bytes + }, + }); + const backend = new KubernetesBackend(cfg, stub); + const res = await backend.execute(cfg, req, execOpts()); + + expect(res.status).toBe('completed'); + const stdout = Buffer.from(res.stdoutBase64, 'base64').toString('utf8'); + // Pre-rotation 'AAAAAA' + post-rotation delta 'BBBBBBBB' (full new file on reset). + expect(stdout).toBe('AAAAAABBBBBBBB'); + expect(res.truncated.stdout).toBe(true); + }); +}); diff --git a/services/sandbox/src/backend/kubernetes/k8s-backend.ts b/services/sandbox/src/backend/kubernetes/k8s-backend.ts index 18d49563d..30675c824 100644 --- a/services/sandbox/src/backend/kubernetes/k8s-backend.ts +++ b/services/sandbox/src/backend/kubernetes/k8s-backend.ts @@ -424,9 +424,18 @@ export class KubernetesBackend implements ExecutionBackend { // waiting for the harvest container to terminate (its completion signal — // harvest owns the user timeout + prints the result line). Every read is a // discrete HTTP GET, so there is NO long-lived stream to abort under Bun. + // + // Canonical stdout is accumulated here from polled deltas (capped at + // stdoutMaxBytes), not re-read in a final pass. A final readPodLog after + // kubelet log rotation would return a mid-stream window (the current + // rotated file) instead of the head of the output — accumulating deltas + // incrementally gives deterministic first-N-bytes semantics regardless of + // when kubelet rotates the container log. let lastLogLen = 0; let logShrunk = false; let loggedPollError = false; + const canonicalChunks: Buffer[] = []; + let canonicalByteCount = 0; const pollRunnerStdout = async (): Promise => { let logs: string; try { @@ -446,12 +455,27 @@ export class KubernetesBackend implements ExecutionBackend { return; } if (logs.length > lastLogLen) { - scanner.onStdoutChunk?.(Buffer.from(logs.slice(lastLogLen), 'utf8')); + const delta = logs.slice(lastLogLen); + scanner.onStdoutChunk?.(Buffer.from(delta, 'utf8')); + // Accumulate into the canonical buffer up to the cap. + const remaining = cfg.stdoutMaxBytes - canonicalByteCount; + if (remaining > 0) { + const deltaBuf = Buffer.from(delta, 'utf8'); + if (deltaBuf.byteLength <= remaining) { + canonicalChunks.push(deltaBuf); + canonicalByteCount += deltaBuf.byteLength; + } else { + canonicalChunks.push(deltaBuf.subarray(0, remaining)); + canonicalByteCount += remaining; + } + } lastLogLen = logs.length; } else if (logs.length < lastLogLen) { - // The kubelet rotated the container log out from under us — the - // canonical head is gone, so the final read is a partial window. + // Kubelet rotated the container log. The pre-rotation bytes are + // already captured in canonicalChunks. Reset lastLogLen to 0 so + // subsequent polls pick up post-rotation content as new deltas. logShrunk = true; + lastLogLen = 0; } }; @@ -540,21 +564,15 @@ export class KubernetesBackend implements ExecutionBackend { await sleep(POLL_INTERVAL_MS, opts.signal); } - // Final stdout read (the runner may have emitted more between the last - // poll and exit) → feed the residual to the scanner, then drain it. The - // canonical buffer is the full (capped) runner log. + // Final poll (the runner may have emitted more between the last poll and + // exit) — feeds the residual to the scanner and accumulates the tail into + // canonicalChunks, then drain the scanner. The canonical buffer is the + // incremental accumulation, immune to kubelet log rotation. await pollRunnerStdout(); scanner.finalize(); - let stdout = ''; - try { - stdout = await readPodLog(this.client, podName, 'runner', { - limitBytes: cfg.stdoutMaxBytes, - }); - } catch (err) { - console.warn('[sandbox.k8s] final runner log read failed:', err); - } + const stdout = Buffer.concat(canonicalChunks).toString('utf8'); const stdoutStreamTruncated = - Buffer.byteLength(stdout, 'utf8') >= cfg.stdoutMaxBytes || logShrunk; + canonicalByteCount >= cfg.stdoutMaxBytes || logShrunk; if (loopOutcome === 'aborted' || opts.signal.aborted) { return this.assemble(req, cfg, opts, {