Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/background-subagent-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"helmor": patch
---

Fix Claude turns ending prematurely after a subagent is moved to the background — the main agent now stays paused and resumes once the background task completes.
198 changes: 198 additions & 0 deletions sidecar/src/claude/background-resume.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/**
* Regression for issue #891: backgrounded subagents never resumed the main
* turn. The SDK signals a backgrounded task with an intermediate `result`
* whose `terminal_reason === "background_requested"`, keeps the SAME query()
* alive, then resumes via `task_notification` to a real terminal result.
*
* The old loop treated ANY `result` as terminal: it fired `end` on the pause
* and `q.close()` tore the session down, so the later `task_notification` was
* lost and the turn ended prematurely.
*
* Fix under test (`session-manager.ts`): a `background_requested` result is
* filtered before passthrough — usage is still recorded via
* `contextUsageUpdated`, but it never reaches the pipeline and never fires
* `end`. The loop keeps draining until the genuinely terminal result.
*/

import { beforeAll, describe, expect, mock, test } from "bun:test";
import type { SDKMessage } from "@anthropic-ai/claude-agent-sdk";
import type { SendMessageParams } from "../session-manager.js";

// The mocked query() returns whatever the active scenario holds. A single
// mocked binding (resolved at session-manager import time) reads this mutable
// outer variable so each test can swap the SDK message stream.
let scenario: SDKMessage[] = [];

function makeQuery(messages: SDKMessage[]) {
return {
async *[Symbol.asyncIterator]() {
for (const m of messages) yield m;
},
close() {},
};
}

mock.module("@anthropic-ai/claude-agent-sdk", () => ({
query: () => makeQuery(scenario),
}));

interface EmitterSpy {
passthroughs: object[];
contextUsageUpdates: number;
ends: number;
emitter: import("../emitter.js").SidecarEmitter;
}

function makeSpyEmitter(): EmitterSpy {
const spy: EmitterSpy = {
passthroughs: [],
contextUsageUpdates: 0,
ends: 0,
emitter: undefined as unknown as import("../emitter.js").SidecarEmitter,
};
// Proxy: record the methods we assert on, no-op everything else so an
// incidental call (e.g. logging) can't blow up the test.
spy.emitter = new Proxy(
{},
{
get(_t, prop) {
if (prop === "passthrough") {
return (_id: string, message: object) =>
spy.passthroughs.push(message);
}
if (prop === "contextUsageUpdated") {
return () => {
spy.contextUsageUpdates += 1;
};
}
if (prop === "end") {
return () => {
spy.ends += 1;
};
}
return () => undefined;
},
},
) as import("../emitter.js").SidecarEmitter;
return spy;
}

const MODEL = "claude-test";
// Shape buildClaudeStoredMeta needs to return non-null so the pause records usage.
const USAGE = { input_tokens: 1000, output_tokens: 500 };
const MODEL_USAGE = { [MODEL]: { contextWindow: 200_000 } };

function assistant(text: string): SDKMessage {
return {
type: "assistant",
message: { role: "assistant", content: [{ type: "text", text }] },
parent_tool_use_id: null,
session_id: "s1",
uuid: `a-${text}`,
} as unknown as SDKMessage;
}

function result(terminalReason: string): SDKMessage {
return {
type: "result",
subtype: "success",
result: terminalReason,
terminal_reason: terminalReason,
usage: USAGE,
modelUsage: MODEL_USAGE,
session_id: "s1",
uuid: `r-${terminalReason}`,
} as unknown as SDKMessage;
}

function taskNotification(): SDKMessage {
return {
type: "system",
subtype: "task_notification",
task_id: "t1",
status: "completed",
output_file: "",
summary: "done",
session_id: "s1",
uuid: "tn-1",
} as unknown as SDKMessage;
}

function baseParams(): SendMessageParams {
return {
sessionId: "s1",
prompt: "research X in the background then synthesize",
model: MODEL,
cwd: undefined,
resume: undefined,
permissionMode: "bypassPermissions",
effortLevel: undefined,
fastMode: undefined,
} as SendMessageParams;
}

let ClaudeSessionManager: typeof import("./session-manager.js").ClaudeSessionManager;

beforeAll(async () => {
({ ClaudeSessionManager } = await import("./session-manager.js"));
});

describe("ClaudeSessionManager backgrounded-task resume (#891)", () => {
test("filters the pause result, resumes, and ends exactly once", async () => {
scenario = [
assistant("starting"),
result("background_requested"),
taskNotification(),
assistant("synthesizing"),
result("completed"),
];
const spy = makeSpyEmitter();
const manager = new ClaudeSessionManager();

await manager.sendMessage("req-1", baseParams(), spy.emitter);

// Exactly one terminal end — NOT one per result.
expect(spy.ends).toBe(1);

// The pause result is never passed into the pipeline.
const passedReasons = spy.passthroughs
.map((m) => (m as { terminal_reason?: string }).terminal_reason)
.filter(Boolean);
expect(passedReasons).not.toContain("background_requested");
expect(passedReasons).toContain("completed");

// Continuation flows through: task_notification + the post-resume assistant.
const subtypes = spy.passthroughs.map(
(m) => (m as { subtype?: string }).subtype,
);
expect(subtypes).toContain("task_notification");
const assistantTexts = spy.passthroughs
.filter((m) => (m as { type?: string }).type === "assistant")
.map(
(m) =>
(m as { message?: { content?: { text?: string }[] } }).message
?.content?.[0]?.text,
);
expect(assistantTexts).toContain("synthesizing");

// Usage recorded at both the pause and the terminal result.
expect(spy.contextUsageUpdates).toBeGreaterThanOrEqual(2);
});

test("if the SDK closes after a pause, the post-loop end still fires", async () => {
// Safe-fallback: an older/future SDK could end the iterator right after
// the pause instead of resuming. The loop exits naturally and the
// post-loop end fires — no hang, no lost terminal event.
scenario = [assistant("starting"), result("background_requested")];
const spy = makeSpyEmitter();
const manager = new ClaudeSessionManager();

await manager.sendMessage("req-2", baseParams(), spy.emitter);

expect(spy.ends).toBe(1);
const passedReasons = spy.passthroughs
.map((m) => (m as { terminal_reason?: string }).terminal_reason)
.filter(Boolean);
expect(passedReasons).not.toContain("background_requested");
});
});
41 changes: 35 additions & 6 deletions sidecar/src/claude/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ interface LiveSession {
* Streaming-input source. The initial prompt is pushed up front in
* `sendMessage`; each `steer()` call pushes one more user message.
* The SDK folds every pushed message into ONE extended turn and
* emits a SINGLE terminal `result` when the whole trajectory is
* done — verified empirically (steer mid-stream yields one merged
* assistant message and one result, not per-push results). The
* for-await loop therefore bails on the first result it sees.
* emits a SINGLE *terminal* `result` when the whole trajectory is
* done. Backgrounded tasks add intermediate `background_requested`
* results mid-turn (filtered out, see `isBackgroundPauseResult`), so
* the for-await loop bails on the first *genuinely terminal* result.
*/
readonly promptSource: Pushable<SDKUserMessage>;
/** Request id owning this session; needed by `steer()` to synthesize
Expand Down Expand Up @@ -728,6 +728,22 @@ export class ClaudeSessionManager implements SessionManager {
uuid: randomUUID(),
});
}
// Backgrounded task pause: SDK keeps the SAME query() alive and
// resumes later via task_notification. Record usage, but keep the
// pause result OUT of the pipeline (accumulator assumes one result
// per turn) and do NOT end the turn — must intercept before the
// unconditional passthrough below.
if (isBackgroundPauseResult(message)) {
const meta = buildClaudeStoredMeta(message, model ?? "");
if (meta) {
emitter.contextUsageUpdated(
requestId,
sessionId,
JSON.stringify(meta),
);
}
continue;
}
// AskUserQuestion tool_use blocks pass through INTACT — the Rust
// adapter renders them as the persistent Q&A card (and merges
// the tool_result answers into it), so stripping them here
Expand Down Expand Up @@ -1287,10 +1303,23 @@ function isResultMessage(
);
}

/** Intermediate `result` the SDK emits when a task is backgrounded — the
* same `query()` stays alive and resumes via `task_notification`. NOT
* terminal: must be filtered before passthrough so it never reaches the
* accumulator (one-result-per-turn) and never fires `end`. */
function isBackgroundPauseResult(message: SDKMessage): boolean {
return (
message.type === "result" &&
(message as { terminal_reason?: string }).terminal_reason ===
"background_requested"
);
}

/** Terminal result — success OR error. Both shapes carry
* `usage`/`modelUsage`, so both should update the ring. AskUserQuestion
* pauses live inside `canUseTool` instead of producing a result event,
* so any `result` we see here is genuinely terminal for this turn. */
* pauses live inside `canUseTool` instead of producing a result event, and
* `background_requested` pauses are filtered upstream, so any `result` that
* reaches this check is genuinely terminal for this turn. */
function isTerminalResult(message: SDKMessage): boolean {
return message.type === "result";
}
Expand Down
Loading