Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ DEUCE_AGENT_HARNESS=pi
DEUCE_PI_PROVIDER=anthropic
DEUCE_PI_MODEL=claude-haiku-4-5

# Global system prompt prepended to every agent's own system_prompt on the Pi
# path (applied via --append-system-prompt at launch). Empty uses the built-in
# default (agent.DefaultBaseSystemPrompt) that steers agents to the ask_user
# tool when blocked on a human decision; set it to override.
DEUCE_AGENT_SYSTEM_PROMPT=

# Auth mode (default "dev"; set to "proxy" when running behind a header-trust
# reverse proxy like forge-proxy or Tailscale Serve). The literal value
# "forge-proxy" is rejected with a migration hint — use "proxy" with the
Expand Down
12 changes: 12 additions & 0 deletions server/internal/agent/dbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func (s *DBStore) CompleteAction(ctx context.Context, sessionID, taskID, callID,
})
}

func (s *DBStore) AgentSystemPrompt(ctx context.Context, agentID string) (string, error) {
aid, err := uuid.Parse(agentID)
if err != nil {
return "", err
}
ag, err := s.q.GetAgent(ctx, aid)
if err != nil {
return "", err
}
return ag.SystemPrompt, nil
}

func (s *DBStore) SetAwaitingInput(ctx context.Context, sessionID, taskID, question, kind string, options []string) (int64, error) {
tid, err := uuid.Parse(taskID)
if err != nil {
Expand Down
22 changes: 20 additions & 2 deletions server/internal/agent/pirun/devpod_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func NewDevpodLauncher(wm *workspace.Manager, provider, model string) *DevpodLau
// Launch starts a long-lived `pi --mode rpc` process in the container. The
// command is prefixed with workspace.ClaudePathPrefix-style PATH handling so the
// pi binary on the user's local bin is found in a non-interactive shell.
func (l *DevpodLauncher) Launch(ctx context.Context, workspaceID string, env []string) (Handle, error) {
inner := join(append([]string{"pi", "--mode", "rpc", "--provider", l.provider}, modelArgs(l.model)...))
func (l *DevpodLauncher) Launch(ctx context.Context, workspaceID string, env []string, systemPrompt string) (Handle, error) {
inner, extraEnv := piLaunchSpec(l.provider, l.model, systemPrompt)
env = append(env, extraEnv...)
// Run pi through a login shell so its install location is on PATH. The
// pi.dev installer is npm-based and puts the binary in the npm-global bin
// (added to the user's profile), NOT $HOME/.local/bin — and a
Expand Down Expand Up @@ -87,6 +88,23 @@ func (l *DevpodLauncher) Launch(ctx context.Context, workspaceID string, env []s
return h, nil
}

// piLaunchSpec builds the inner `pi --mode rpc ...` command and any extra env
// vars. When systemPrompt is non-empty, the prompt text is passed through the
// environment (DEUCE_SYSTEM_PROMPT, forwarded via devpod --set-env as a single
// argv element) and referenced from --append-system-prompt, so arbitrary prompt
// content — quotes, newlines, $ — never has to be shell-quoted inside the
// bash -lc command. --append-system-prompt keeps Pi's default coding-agent
// prompt and adds the agent's persona, matching the legacy executor. Pure for
// testability.
func piLaunchSpec(provider, model, systemPrompt string) (inner string, extraEnv []string) {
args := append([]string{"pi", "--mode", "rpc", "--provider", provider}, modelArgs(model)...)
if systemPrompt != "" {
extraEnv = []string{"DEUCE_SYSTEM_PROMPT=" + systemPrompt}
args = append(args, "--append-system-prompt", `"$DEUCE_SYSTEM_PROMPT"`)
}
return join(args), extraEnv
}

func modelArgs(model string) []string {
if model == "" {
return nil
Expand Down
44 changes: 44 additions & 0 deletions server/internal/agent/pirun/devpod_launcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pirun

import (
"strings"
"testing"
)

func TestPiLaunchSpecNoSystemPrompt(t *testing.T) {
inner, env := piLaunchSpec("anthropic", "claude-haiku-4-5", "")
if strings.Contains(inner, "append-system-prompt") {
t.Errorf("no system prompt should not add the flag: %q", inner)
}
if len(env) != 0 {
t.Errorf("no system prompt should add no env, got %v", env)
}
if !strings.Contains(inner, "pi --mode rpc --provider anthropic --model claude-haiku-4-5") {
t.Errorf("unexpected base command: %q", inner)
}
}

func TestPiLaunchSpecWithSystemPrompt(t *testing.T) {
inner, env := piLaunchSpec("anthropic", "", "You are Coder. Use ask_user when blocked.")
// The flag references the env var, not the literal text — so the prompt
// never lands in the shell-interpreted command string.
if !strings.Contains(inner, `--append-system-prompt "$DEUCE_SYSTEM_PROMPT"`) {
t.Errorf("flag should reference the env var: %q", inner)
}
if len(env) != 1 || env[0] != "DEUCE_SYSTEM_PROMPT=You are Coder. Use ask_user when blocked." {
t.Errorf("prompt should ride DEUCE_SYSTEM_PROMPT env, got %v", env)
}
}

func TestPiLaunchSpecArbitraryContentRidesEnvUntouched(t *testing.T) {
// Quotes, newlines, and shell metacharacters must pass through verbatim in
// the env value (no shell quoting), and never appear in the command string.
tricky := "Line 1\n\"quoted\" 'single' $HOME `backtick` && rm -rf"
inner, env := piLaunchSpec("anthropic", "m", tricky)
if len(env) != 1 || env[0] != "DEUCE_SYSTEM_PROMPT="+tricky {
t.Errorf("tricky prompt should be carried verbatim in env, got %v", env)
}
if strings.Contains(inner, "rm -rf") || strings.Contains(inner, "backtick") {
t.Errorf("prompt content must not leak into the command string: %q", inner)
}
}
9 changes: 6 additions & 3 deletions server/internal/agent/pirun/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type stderrReporter interface {
// seam, mirroring workspace.commandRunner / sshproxy.resolveContainerHook so
// the supervisor is testable without a container.
type Launcher interface {
Launch(ctx context.Context, workspaceID string, env []string) (Handle, error)
// Launch starts a Pi process. systemPrompt, when non-empty, is applied to
// the agent via --append-system-prompt (passed through the environment so
// arbitrary text needs no shell quoting).
Launch(ctx context.Context, workspaceID string, env []string, systemPrompt string) (Handle, error)
}

// Exit is emitted on the supervisor's Exits channel when a process dies (clean
Expand Down Expand Up @@ -164,7 +167,7 @@ func (s *Supervisor) Get(key Key) (*Process, bool) {
// callers never race the devpod-ssh tunnel setup (the U1 transport caveat). When
// sessionPath is non-empty it re-attaches to that prior Pi session via
// switch_session (KTD13 continuity), tolerating failure.
func (s *Supervisor) Ensure(ctx context.Context, key Key, workspaceID, sessionPath string) (*Process, error) {
func (s *Supervisor) Ensure(ctx context.Context, key Key, workspaceID, sessionPath, systemPrompt string) (*Process, error) {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
Expand All @@ -180,7 +183,7 @@ func (s *Supervisor) Ensure(ctx context.Context, key Key, workspaceID, sessionPa
if s.apiKey != "" {
env = append(env, "ANTHROPIC_API_KEY="+s.apiKey)
}
h, err := s.launcher.Launch(s.ctx, workspaceID, env)
h, err := s.launcher.Launch(s.ctx, workspaceID, env, systemPrompt)
if err != nil {
return nil, fmt.Errorf("pirun: launch %s: %w", key, err)
}
Expand Down
24 changes: 12 additions & 12 deletions server/internal/agent/pirun/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type fakeLauncher struct {
failNext error
}

func (l *fakeLauncher) Launch(context.Context, string, []string) (Handle, error) {
func (l *fakeLauncher) Launch(context.Context, string, []string, string) (Handle, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.failNext != nil {
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestEnsureLaunchesAndHandshakes(t *testing.T) {
s := NewSupervisor(l, "sk-test")
key := Key{SessionID: "s1", AgentID: "a1"}

p, err := s.Ensure(context.Background(), key, "ws1", "")
p, err := s.Ensure(context.Background(), key, "ws1", "", "")
if err != nil {
t.Fatalf("Ensure: %v", err)
}
Expand All @@ -147,9 +147,9 @@ func TestEnsureReusesAndIsolatesKeys(t *testing.T) {
k1 := Key{SessionID: "s1", AgentID: "a1"}
k2 := Key{SessionID: "s1", AgentID: "a2"}

p1, _ := s.Ensure(ctx, k1, "ws", "")
p1b, _ := s.Ensure(ctx, k1, "ws", "") // reuse
p2, _ := s.Ensure(ctx, k2, "ws", "") // distinct agent
p1, _ := s.Ensure(ctx, k1, "ws", "", "")
p1b, _ := s.Ensure(ctx, k1, "ws", "", "") // reuse
p2, _ := s.Ensure(ctx, k2, "ws", "", "") // distinct agent

if p1 != p1b {
t.Error("same key should reuse the same process")
Expand All @@ -168,7 +168,7 @@ func TestEnsureLaunchError(t *testing.T) {
s := NewSupervisor(l, "")
key := Key{SessionID: "s1", AgentID: "a1"}

if _, err := s.Ensure(context.Background(), key, "ws", ""); err == nil {
if _, err := s.Ensure(context.Background(), key, "ws", "", ""); err == nil {
t.Fatal("expected launch error")
}
if _, ok := s.Get(key); ok {
Expand All @@ -181,7 +181,7 @@ func TestProcessExitEmitsSignal(t *testing.T) {
s := NewSupervisor(l, "")
key := Key{SessionID: "s1", AgentID: "a1"}

p, err := s.Ensure(context.Background(), key, "ws", "")
p, err := s.Ensure(context.Background(), key, "ws", "", "")
if err != nil {
t.Fatalf("Ensure: %v", err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestReattachSendsSwitchSession(t *testing.T) {
s := NewSupervisor(l, "")
key := Key{SessionID: "s1", AgentID: "a1"}

if _, err := s.Ensure(context.Background(), key, "ws", "/sessions/prior.jsonl"); err != nil {
if _, err := s.Ensure(context.Background(), key, "ws", "/sessions/prior.jsonl", ""); err != nil {
t.Fatalf("Ensure: %v", err)
}
m := l.handles[0].waitCmd(t, "switch_session")
Expand All @@ -238,8 +238,8 @@ func TestShutdownStopsAllAndRejectsEnsure(t *testing.T) {
l := &fakeLauncher{}
s := NewSupervisor(l, "")
ctx := context.Background()
_, _ = s.Ensure(ctx, Key{SessionID: "s", AgentID: "a"}, "ws", "")
_, _ = s.Ensure(ctx, Key{SessionID: "s", AgentID: "b"}, "ws", "")
_, _ = s.Ensure(ctx, Key{SessionID: "s", AgentID: "a"}, "ws", "", "")
_, _ = s.Ensure(ctx, Key{SessionID: "s", AgentID: "b"}, "ws", "", "")

if err := s.Shutdown(ctx); err != nil {
t.Fatalf("Shutdown: %v", err)
Expand All @@ -251,7 +251,7 @@ func TestShutdownStopsAllAndRejectsEnsure(t *testing.T) {
t.Error("handle was not stopped on shutdown")
}
}
if _, err := s.Ensure(ctx, Key{SessionID: "s", AgentID: "c"}, "ws", ""); !errors.Is(err, ErrSupervisorClosed) {
if _, err := s.Ensure(ctx, Key{SessionID: "s", AgentID: "c"}, "ws", "", ""); !errors.Is(err, ErrSupervisorClosed) {
t.Errorf("Ensure after shutdown = %v, want ErrSupervisorClosed", err)
}
}
Expand All @@ -262,7 +262,7 @@ func TestIdleReap(t *testing.T) {
s.idleTimeout = 60 * time.Millisecond // shorten for the test (same package)
key := Key{SessionID: "s1", AgentID: "a1"}

if _, err := s.Ensure(context.Background(), key, "ws", ""); err != nil {
if _, err := s.Ensure(context.Background(), key, "ws", "", ""); err != nil {
t.Fatalf("Ensure: %v", err)
}
// No activity → reaped after the idle timeout.
Expand Down
71 changes: 56 additions & 15 deletions server/internal/agent/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Runtime struct {
store Store
sup *pirun.Supervisor
bc Broadcaster
// baseSystemPrompt is prepended to each agent's own system_prompt at Pi
// launch — the global "prefer ask_user when blocked" guidance.
baseSystemPrompt string
// replyPoster, when set, posts an agent's terminal reply as a normal chat
// message so it shows in the existing chat UI (the Super Threads task/action
// cards are a separate, later surface). Wired by the handler layer.
Expand Down Expand Up @@ -69,23 +72,51 @@ const (
defaultAwaitTimeout = 30 * time.Minute
)

// DefaultBaseSystemPrompt is the global system prompt applied to every agent on
// the Pi path, ahead of the agent's own system_prompt. Its load-bearing job is
// to steer agents to the ask_user tool when blocked on a human decision (which
// is what surfaces the interactive typed prompt) instead of guessing or asking
// in a normal chat reply. Overridable via DEUCE_AGENT_SYSTEM_PROMPT.
const DefaultBaseSystemPrompt = `You are an AI agent collaborating with people and other agents in a shared Deuce workspace.

Ask before you guess. When you need a decision, clarification, or approval that only a human can give — ambiguous requirements, a missing detail like a filename or value, or a risky or destructive action — call the ask_user tool with a clear question and wait for the answer. Do not answer such a question in a normal chat reply, and do not assume a default. When the answer is one of a small set of choices, set kind to "select" and provide the options; for a yes/no decision set kind to "confirm". Only ask when you are genuinely blocked — otherwise keep working.`

// NewRuntime builds the runtime. Call Start to begin consuming process exits.
func NewRuntime(store Store, sup *pirun.Supervisor, bc Broadcaster) *Runtime {
// baseSystemPrompt is prepended to every agent's own system_prompt at Pi launch
// (pass DefaultBaseSystemPrompt for the standard guidance, or "" to disable).
func NewRuntime(store Store, sup *pirun.Supervisor, bc Broadcaster, baseSystemPrompt string) *Runtime {
ctx, cancel := context.WithCancel(context.Background())
return &Runtime{
store: store,
sup: sup,
bc: bc,
running: make(map[pirun.Key]string),
workspace: make(map[pirun.Key]string),
consumers: make(map[pirun.Key]*pirun.Process),
replies: make(map[string]*strings.Builder),
pendingReq: make(map[string]string),
timers: make(map[string]*taskTimers),
activeTimeout: defaultActiveTimeout,
awaitTimeout: defaultAwaitTimeout,
ctx: ctx,
cancel: cancel,
store: store,
sup: sup,
bc: bc,
baseSystemPrompt: baseSystemPrompt,
running: make(map[pirun.Key]string),
workspace: make(map[pirun.Key]string),
consumers: make(map[pirun.Key]*pirun.Process),
replies: make(map[string]*strings.Builder),
pendingReq: make(map[string]string),
timers: make(map[string]*taskTimers),
activeTimeout: defaultActiveTimeout,
awaitTimeout: defaultAwaitTimeout,
ctx: ctx,
cancel: cancel,
}
}

// joinSystemPrompts combines the global base prompt with an agent's own
// system_prompt, trimming each and separating with a blank line. Either may be
// empty, in which case the other is returned (both empty → "").
func joinSystemPrompts(base, agent string) string {
base = strings.TrimSpace(base)
agent = strings.TrimSpace(agent)
switch {
case base == "":
return agent
case agent == "":
return base
default:
return base + "\n\n" + agent
}
}

Expand Down Expand Up @@ -246,7 +277,17 @@ func (r *Runtime) promoteLocked(ctx context.Context, key pirun.Key) {
r.mu.Lock()
wsID := r.workspace[key]
r.mu.Unlock()
p, err := r.sup.Ensure(ctx, key, wsID, "")
// The global guidance plus the agent's own persona/instructions are applied
// to the Pi process at launch (Ensure only launches when no process exists
// for the key, so this is a no-op on reuse). A per-agent lookup failure is
// non-fatal — fall back to the global base alone rather than failing the task.
agentPrompt, err := r.store.AgentSystemPrompt(ctx, key.AgentID)
if err != nil {
slog.Warn("runtime: agent system prompt lookup failed", "key", key.String(), "error", err)
agentPrompt = ""
}
systemPrompt := joinSystemPrompts(r.baseSystemPrompt, agentPrompt)
p, err := r.sup.Ensure(ctx, key, wsID, "", systemPrompt)
if err != nil {
slog.Error("runtime: ensure pi process", "key", key.String(), "error", err)
// promote=false: we are inside promoteLocked, don't recurse. teardown=true
Expand Down
18 changes: 11 additions & 7 deletions server/internal/agent/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ type fakeTask struct {
}

type fakeStore struct {
mu sync.Mutex
seq map[string]int64
tasks map[string]*fakeTask
order int
idn int
mu sync.Mutex
seq map[string]int64
tasks map[string]*fakeTask
order int
idn int
systemPrompt string
}

func newFakeStore() *fakeStore {
Expand Down Expand Up @@ -69,6 +70,9 @@ func (s *fakeStore) MarkRunning(_ context.Context, sessionID, taskID string) (in
func (s *fakeStore) SetAwaitingInput(_ context.Context, sessionID, taskID, _, _ string, _ []string) (int64, error) {
return s.setState(sessionID, taskID, StateAwaitingInput), nil
}
func (s *fakeStore) AgentSystemPrompt(_ context.Context, _ string) (string, error) {
return s.systemPrompt, nil
}
func (s *fakeStore) ResolveAwaitingInput(_ context.Context, sessionID, taskID string) (int64, error) {
return s.setState(sessionID, taskID, StateRunning), nil
}
Expand Down Expand Up @@ -257,7 +261,7 @@ type tLauncher struct {
hs []*tHandle
}

func (l *tLauncher) Launch(context.Context, string, []string) (pirun.Handle, error) {
func (l *tLauncher) Launch(context.Context, string, []string, string) (pirun.Handle, error) {
h := newTHandle()
l.mu.Lock()
l.hs = append(l.hs, h)
Expand Down Expand Up @@ -296,7 +300,7 @@ func newTestRuntime(t *testing.T) (*Runtime, *fakeStore, *fakeBroadcaster, *tLau
bc := &fakeBroadcaster{}
lr := &tLauncher{}
sup := pirun.NewSupervisor(lr, "test-key")
rt := NewRuntime(store, sup, bc)
rt := NewRuntime(store, sup, bc, "")
rt.Start()
t.Cleanup(func() {
rt.Shutdown()
Expand Down
5 changes: 5 additions & 0 deletions server/internal/agent/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type Store interface {

// TaskState returns the current state of a task.
TaskState(ctx context.Context, taskID string) (state string, ok bool, err error)

// AgentSystemPrompt returns the agent's configured system prompt (empty
// when unset). Applied to the Pi process at launch so the agent carries its
// persona/instructions (the legacy executor did this via --append-system-prompt).
AgentSystemPrompt(ctx context.Context, agentID string) (string, error)
}

// EnqueueParams describes a new task to enqueue.
Expand Down
Loading
Loading