Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 132 additions & 1 deletion services/sandbox/src/backend/kubernetes/k8s-backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
staleLifetimeCutoffMs,
} from './k8s-backend.ts';
import { podNameFor } from './k8s-pod-spec.ts';
import { formatResultLine } from './k8s-protocol.ts';

const cfg: SpawnerConfig = {
backend: 'kubernetes',
Expand Down Expand Up @@ -227,7 +228,7 @@ function stubClient(behavior: {
createSecret?: () => Promise<unknown>;
createPod?: () => Promise<unknown>;
readPod?: () => Promise<V1Pod>;
readLog?: () => Promise<string>;
readLog?: (params?: { container?: string }) => Promise<string>;
}): { core: CoreV1Api; namespace: string; calls: StubCalls } {
const calls: StubCalls = { deletes: [], replaces: 0 };
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- test stub
Expand Down Expand Up @@ -334,3 +335,133 @@ describe('duplicate-executionId safety', () => {
);
});
});

// ---- log rotation: canonical stdout from polled deltas --------------------
//
// Verifies that when kubelet rotates the container log (the log byte-count
// drops between polls), the canonical stdout is the head bytes accumulated
// BEFORE rotation — not the mid-stream window the rotated file starts at.

describe('stdout accumulation across kubelet log rotation', () => {
const harvestResultLine = formatResultLine({
exitCode: 0,
stderr: '',
stderrTruncated: false,
outputFiles: [],
truncatedFiles: 0,
uploadStats: { attempted: 0, succeeded: 0, failures: [] },
quotaExhausted: false,
uploadFailed: false,
reportFailed: false,
readFailed: false,
stageMs: 10,
harvestMs: 10,
uploadMs: 5,
});

// Pod fixture: runner running (used by waitForRunnerStart).
function runningPod(): V1Pod {
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- partial V1Pod fixture
return {
metadata: {},
status: {
containerStatuses: [
{ name: 'runner', state: { running: { startedAt: new Date() } } },
{ name: 'harvest', state: { running: { startedAt: new Date() } } },
],
},
} as V1Pod;
}

// Pod fixture: harvest terminated → main loop breaks.
function terminatedPod(): V1Pod {
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- partial V1Pod fixture
return {
metadata: {},
status: {
phase: 'Succeeded',
containerStatuses: [
{ name: 'runner', state: { terminated: { exitCode: 0 } } },
{ name: 'harvest', state: { terminated: { exitCode: 0 } } },
],
},
} as V1Pod;
}

test('pre-rotation head bytes are preserved; truncated.stdout is set', async () => {
// Runner emits 'AAAAA' (polled in main loop), then kubelet rotates so the
// log shrinks to 'BBB' (detected in the final poll after the loop exits).
// With the old code the final readPodLog returned 'BBB' as the canonical
// stdout. With the fix the canonical is 'AAAAA' (what was accumulated
// before the rotation), and truncated.stdout is set because logShrunk=true.
let readPodCount = 0;
let runnerLogCount = 0;
const stub = stubClient({
readPod: () => {
readPodCount += 1;
// waitForRunnerStart: first call sees runner running.
// Main loop: second call sees harvest terminated → immediate break.
return Promise.resolve(
readPodCount <= 1 ? runningPod() : terminatedPod(),
);
},
readLog: (p) => {
if (p?.container === 'harvest') {
return Promise.resolve(harvestResultLine);
}
// Runner log sequence: empty → pre-rotation → rotation (shorter, final poll)
runnerLogCount += 1;
if (runnerLogCount === 1) return Promise.resolve('AAAAA');
// Final poll after loop: kubelet has rotated — shorter than 'AAAAA'.
return Promise.resolve('BBB');
},
});
const backend = new KubernetesBackend(cfg, stub);
const res = await backend.execute(cfg, req, execOpts());

expect(res.status).toBe('completed');
const stdout = Buffer.from(res.stdoutBase64, 'base64').toString('utf8');
// Canonical stdout must be the pre-rotation head, not the rotated window.
expect(stdout).toBe('AAAAA');
expect(res.truncated.stdout).toBe(true);
});

test('post-rotation deltas are appended when cap has not been reached', async () => {
// Runner emits 6 bytes pre-rotation, then log rotates, then 4 more bytes
// appear in the new file. All 10 bytes fit under stdoutMaxBytes so both
// chunks should appear in the canonical output.
let readPodCount = 0;
let runnerLogCount = 0;
const stub = stubClient({
readPod: () => {
readPodCount += 1;
// waitForRunnerStart: first call → runner running.
// Main loop: second call → still running (so post-rotation poll fires).
// Third call: harvest terminated.
if (readPodCount <= 1) return Promise.resolve(runningPod());
if (readPodCount === 2) return Promise.resolve(runningPod());
return Promise.resolve(terminatedPod());
},
readLog: (p) => {
if (p?.container === 'harvest') {
return Promise.resolve(harvestResultLine);
}
runnerLogCount += 1;
// Call 1 (main-loop poll 1): pre-rotation content.
if (runnerLogCount === 1) return Promise.resolve('AAAAAA'); // 6 bytes
// Call 2 (main-loop poll 2): rotation — new file starts at 0.
if (runnerLogCount === 2) return Promise.resolve('BBBB'); // 4 bytes < 6 → rotation
// Call 3 (final poll): new file grew a bit more.
return Promise.resolve('BBBBBBBB'); // 8 bytes
},
});
const backend = new KubernetesBackend(cfg, stub);
const res = await backend.execute(cfg, req, execOpts());

expect(res.status).toBe('completed');
const stdout = Buffer.from(res.stdoutBase64, 'base64').toString('utf8');
// Pre-rotation 'AAAAAA' + post-rotation delta 'BBBBBBBB' (full new file on reset).
expect(stdout).toBe('AAAAAABBBBBBBB');
expect(res.truncated.stdout).toBe(true);
});
});
48 changes: 33 additions & 15 deletions services/sandbox/src/backend/kubernetes/k8s-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,18 @@ export class KubernetesBackend implements ExecutionBackend {
// waiting for the harvest container to terminate (its completion signal —
// harvest owns the user timeout + prints the result line). Every read is a
// discrete HTTP GET, so there is NO long-lived stream to abort under Bun.
//
// Canonical stdout is accumulated here from polled deltas (capped at
// stdoutMaxBytes), not re-read in a final pass. A final readPodLog after
// kubelet log rotation would return a mid-stream window (the current
// rotated file) instead of the head of the output — accumulating deltas
// incrementally gives deterministic first-N-bytes semantics regardless of
// when kubelet rotates the container log.
let lastLogLen = 0;
let logShrunk = false;
let loggedPollError = false;
const canonicalChunks: Buffer[] = [];
let canonicalByteCount = 0;
const pollRunnerStdout = async (): Promise<void> => {
let logs: string;
try {
Expand All @@ -446,12 +455,27 @@ export class KubernetesBackend implements ExecutionBackend {
return;
}
if (logs.length > lastLogLen) {
scanner.onStdoutChunk?.(Buffer.from(logs.slice(lastLogLen), 'utf8'));
const delta = logs.slice(lastLogLen);
scanner.onStdoutChunk?.(Buffer.from(delta, 'utf8'));
// Accumulate into the canonical buffer up to the cap.
const remaining = cfg.stdoutMaxBytes - canonicalByteCount;
if (remaining > 0) {
const deltaBuf = Buffer.from(delta, 'utf8');
if (deltaBuf.byteLength <= remaining) {
canonicalChunks.push(deltaBuf);
canonicalByteCount += deltaBuf.byteLength;
} else {
canonicalChunks.push(deltaBuf.subarray(0, remaining));
canonicalByteCount += remaining;
}
}
lastLogLen = logs.length;
} else if (logs.length < lastLogLen) {
// The kubelet rotated the container log out from under us — the
// canonical head is gone, so the final read is a partial window.
// Kubelet rotated the container log. The pre-rotation bytes are
// already captured in canonicalChunks. Reset lastLogLen to 0 so
// subsequent polls pick up post-rotation content as new deltas.
logShrunk = true;
lastLogLen = 0;
}
};

Expand Down Expand Up @@ -540,21 +564,15 @@ export class KubernetesBackend implements ExecutionBackend {
await sleep(POLL_INTERVAL_MS, opts.signal);
}

// Final stdout read (the runner may have emitted more between the last
// poll and exit) → feed the residual to the scanner, then drain it. The
// canonical buffer is the full (capped) runner log.
// Final poll (the runner may have emitted more between the last poll and
// exit) — feeds the residual to the scanner and accumulates the tail into
// canonicalChunks, then drain the scanner. The canonical buffer is the
// incremental accumulation, immune to kubelet log rotation.
await pollRunnerStdout();
scanner.finalize();
let stdout = '';
try {
stdout = await readPodLog(this.client, podName, 'runner', {
limitBytes: cfg.stdoutMaxBytes,
});
} catch (err) {
console.warn('[sandbox.k8s] final runner log read failed:', err);
}
const stdout = Buffer.concat(canonicalChunks).toString('utf8');
const stdoutStreamTruncated =
Buffer.byteLength(stdout, 'utf8') >= cfg.stdoutMaxBytes || logShrunk;
canonicalByteCount >= cfg.stdoutMaxBytes || logShrunk;

if (loopOutcome === 'aborted' || opts.signal.aborted) {
return this.assemble(req, cfg, opts, {
Expand Down