Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func main() {
}
d.Register("hook.getInfo", newHookGetInfoHandler(hookProvider))

// ping — client keepalive. The TS client calls this periodically so the
// idle watchdog (StartIdleWatchdog below) can tell a live-but-idle session
// from a vanished client. The handler is a no-op; merely receiving the line
// resets the agent's lastInbound timestamp.
d.Register("ping", func(_ context.Context, _ json.RawMessage) (any, error) {
return struct{}{}, nil
})

host := stdioserver.New(d, os.Stdin, os.Stdout, agentLogger)
fsys.SetEventSink(func(event string, payload any) error {
err := host.EmitEvent(event, payload)
Expand Down Expand Up @@ -143,6 +151,13 @@ func main() {
// 일치해야 한다. ctx 취소(드레인) 시 자동 정지한다.
host.StartHeartbeat(10 * time.Second)

// Idle watchdog: self-terminate if the client sends nothing for 60s. The
// client pings every ~20s (KEEPALIVE_PING_INTERVAL_MS in pipe.ts), so a
// healthy idle session resets the timer ~3× per window; only a vanished
// client (half-open TCP, hung process, sleep) with no stdin EOF trips it,
// preventing an orphaned remote agent from holding its binary.
host.StartIdleWatchdog(60 * time.Second)

host.Run()
}

Expand Down
6 changes: 4 additions & 2 deletions internal/fs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

// MaxReadableFileSize caps how many bytes one fs.readFile / fs.writeFile call
// may move. The threshold matches the renderer's editor capacity so we never
// produce a file that we couldn't reload.
const MaxReadableFileSize = 5 * 1024 * 1024
// produce a file that we couldn't reload. Raising this requires raising the
// inbound scanner cap in stdioserver.Host.Run accordingly — writeFile content
// travels inbound as one NDJSON line and is rejected by that cap first.
const MaxReadableFileSize = 50 * 1024 * 1024

// EventSink is the callback fs uses to push agent events back to Electron.
type EventSink func(event string, payload any) error
Expand Down
71 changes: 63 additions & 8 deletions internal/stdioserver/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ type Host struct {
// 종료된다 — 멈춘 hook이 셧다운을 막을 수 없다.
hooksMu sync.Mutex
shutdownHooks []func()

// lastInbound is the UnixNano timestamp of the most recently received
// request line, read by the idle watchdog (StartIdleWatchdog) to detect a
// vanished client. Written from Run's single reader goroutine, read from
// the watchdog goroutine — atomic keeps that race-free.
lastInbound atomic.Int64

// exit terminates the process. Defaults to os.Exit; tests inject a fake so
// drain/watchdog termination can be observed without killing the runner.
exit func(int)
}

// New constructs a Host bound to the given dispatcher and stdio streams.
Expand All @@ -81,6 +91,7 @@ func New(d *dispatch.Dispatcher, in io.Reader, out io.Writer, logger *slog.Logge
ctx: ctx,
cancel: cancel,
accepting: true,
exit: os.Exit,
}
}

Expand Down Expand Up @@ -120,15 +131,23 @@ func (h *Host) EmitEvent(event string, payload any) error {
// the parent's shutdown signal is honored.
func (h *Host) Run() {
scanner := bufio.NewScanner(h.in)
// 4 MiB cap matches the largest request shape we expect (writeFile
// content up to MaxReadableFileSize plus envelope overhead).
scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
// Cap matches the largest request shape we expect: a writeFile whose
// content is up to MaxReadableFileSize (50 MiB), JSON-escaped, plus
// envelope overhead. 64 MiB leaves headroom for escaping without
// allocating eagerly — Scanner grows its buffer lazily from 64 KiB up to
// this ceiling.
scanner.Buffer(make([]byte, 0, 64*1024), 64*1024*1024)

for scanner.Scan() {
// Copy the slice — scanner reuses its internal buffer between
// calls and the line escapes into a goroutine below.
line := append([]byte(nil), scanner.Bytes()...)
if len(line) == 0 || !h.isAccepting() {
if len(line) == 0 {
continue
}
// Any inbound line proves the client is alive — reset the idle watchdog.
h.lastInbound.Store(time.Now().UnixNano())
if !h.isAccepting() {
continue
}
h.wg.Add(1)
Expand Down Expand Up @@ -178,6 +197,41 @@ func (h *Host) StartHeartbeat(interval time.Duration) {
}()
}

// StartIdleWatchdog self-terminates the agent (via drainAndExit) when no
// inbound request line arrives within `limit`. This reaps the agent — and,
// through it, every PTY child — when the client has vanished but the SSH
// connection lingers without delivering stdin EOF: a half-open TCP link, a
// hung client process, or a suspended laptop. The client sends a periodic
// `ping` so a healthy but idle session keeps resetting lastInbound; only a
// genuinely absent client trips the limit.
//
// `limit` must be comfortably larger than the client's keepalive ping interval
// (KEEPALIVE_PING_INTERVAL_MS in pipe.ts) so normal jitter never false-fires.
// A non-positive limit disables the watchdog. Call before Run(); the goroutine
// stops when h.ctx is cancelled (drain).
func (h *Host) StartIdleWatchdog(limit time.Duration) {
if limit <= 0 {
return
}
h.lastInbound.Store(time.Now().UnixNano())
go func() {
ticker := time.NewTicker(limit / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
last := time.Unix(0, h.lastInbound.Load())
if time.Since(last) >= limit {
h.drainAndExit(0)
return
}
case <-h.ctx.Done():
return
}
}
}()
}

// RegisterShutdownHook 는 drainAndExit가 os.Exit 호출 직전에 등록 순서대로
// 실행할 cleanup 콜백을 추가한다. SIGTERM 시 defer가 우회되므로, hookserver
// 소켓 파일 정리 등 명시적 정리가 필요한 자원은 이 훅을 사용한다.
Expand Down Expand Up @@ -266,7 +320,7 @@ func (h *Host) drainAndExit(code int) {
// even if Wait() itself blocks", which Wait can do when a
// handler is stuck in a syscall. The same forceExit covers
// the shutdown-hook execution window below.
forceExit := time.AfterFunc(forceExitAfter, func() { os.Exit(code) })
forceExit := time.AfterFunc(forceExitAfter, func() { h.exit(code) })
done := make(chan struct{})
go func() {
h.wg.Wait()
Expand All @@ -276,14 +330,15 @@ func (h *Host) drainAndExit(code int) {
case <-done:
// Continue past the select to run shutdown hooks before exit.
case <-time.After(forceExitAfter):
os.Exit(code)
h.exit(code)
return
}
// Hooks run synchronously under the same forceExit timer.
// If a hook hangs the AfterFunc above still trips os.Exit on time.
// If a hook hangs the AfterFunc above still trips h.exit on time.
h.runShutdownHooks()
forceExit.Stop()
h.cancel()
os.Exit(code)
h.exit(code)
})
}

Expand Down
94 changes: 94 additions & 0 deletions internal/stdioserver/host_idle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Package stdioserver — idle watchdog tests.
//
// StartIdleWatchdog must self-terminate the agent when the client stops
// sending (vanished client, half-open link) but must never fire while inbound
// traffic keeps arriving. We inject a fake exit so termination is observable
// without killing the test runner.
package stdioserver

import (
"io"
"log/slog"
"strings"
"testing"
"time"

"github.com/nexus-code/nexus-code/internal/dispatch"
)

func newTestHost() *Host {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
return New(dispatch.New(), strings.NewReader(""), io.Discard, logger)
}

// Watchdog fires when no inbound line arrives within the limit.
func TestIdleWatchdogExitsWhenClientVanishes(t *testing.T) {
host := newTestHost()
exited := make(chan int, 1)
host.exit = func(code int) {
select {
case exited <- code:
default:
}
}

host.StartIdleWatchdog(30 * time.Millisecond)

select {
case code := <-exited:
if code != 0 {
t.Fatalf("exit code = %d, want 0", code)
}
case <-time.After(2 * time.Second):
t.Fatal("idle watchdog did not terminate within 2s of client silence")
}
}

// Watchdog must NOT fire while inbound lines keep resetting lastInbound.
func TestIdleWatchdogStaysAliveWithTraffic(t *testing.T) {
host := newTestHost()
exited := make(chan int, 1)
host.exit = func(code int) {
select {
case exited <- code:
default:
}
}

host.StartIdleWatchdog(60 * time.Millisecond)

stop := time.After(240 * time.Millisecond)
tick := time.NewTicker(20 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
host.lastInbound.Store(time.Now().UnixNano())
case code := <-exited:
t.Fatalf("watchdog fired during active traffic (code=%d)", code)
case <-stop:
return // survived the window without firing
}
}
}

// A non-positive limit disables the watchdog entirely.
func TestIdleWatchdogDisabledWhenLimitNonPositive(t *testing.T) {
host := newTestHost()
exited := make(chan int, 1)
host.exit = func(code int) {
select {
case exited <- code:
default:
}
}

host.StartIdleWatchdog(0)

select {
case <-exited:
t.Fatal("watchdog fired despite a non-positive (disabled) limit")
case <-time.After(100 * time.Millisecond):
// expected: no termination
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "nexus-code",
"productName": "NexusCode",
"version": "0.5.1",
"version": "0.5.2",
"description": "Multi-workspace VSCode-style editor for macOS. Monaco editor + terminal in one window.",
"license": "MIT",
"private": true,
Expand Down
50 changes: 42 additions & 8 deletions src/main/features/ssh/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { SshControlMaster } from "../../infra/agent/ssh/master";
import {
type EnsureRemoteAgentOptions,
ensureRemoteAgent,
type LspBootstrapProgressEvent,
} from "../../infra/agent/ssh/ssh-bootstrap/index";
import { register, validateArgs } from "../../infra/ipc-router";
import { BROWSE_MAX_ENTRIES, type SshBrowseSessionRegistry } from "./browse-session-registry";
Expand Down Expand Up @@ -64,6 +65,10 @@ export function registerSshChannel(configPath = path.join(os.homedir(), ".ssh",
export function registerSshBrowseHandlers(
registry: SshBrowseSessionRegistry,
promptHandler: SshAuthPromptHandler,
// Optional so existing callers/tests that don't care about progress keep
// working; when supplied, openBrowseSession streams bootstrap progress to
// the renderer via the `ssh.browseProgress` event keyed by progressId.
broadcast?: BrowseProgressBroadcast,
): () => void {
register("ssh", {
call: {
Expand All @@ -72,7 +77,7 @@ export function registerSshBrowseHandlers(
// cancellation arrives at the renderer as ipcErr("cancelled") — the
// router passes the envelope silently without logging, and the
// renderer uses ipcCallResult to branch on result.kind.
openBrowseSession: openBrowseSessionResultHandler(registry, promptHandler),
openBrowseSession: openBrowseSessionResultHandler(registry, promptHandler, broadcast),
browseSession: browseSessionHandler(registry),
closeBrowseSession: closeBrowseSessionHandler(registry),
},
Expand All @@ -81,6 +86,12 @@ export function registerSshBrowseHandlers(
return () => registry.dispose();
}

/**
* Broadcast fn shape used to push browse-session bootstrap progress to the
* renderer — matches the main-process forwardBroadcast signature.
*/
export type BrowseProgressBroadcast = (channelName: string, event: string, args: unknown) => void;

// ---------------------------------------------------------------------------
// listConfigHosts
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -138,13 +149,15 @@ function isMissingOrPermissionError(error: unknown): boolean {
export function openBrowseSessionHandler(
registry: SshBrowseSessionRegistry,
promptHandler: SshAuthPromptHandler,
bootstrap: (options: EnsureRemoteAgentOptions) => ReturnType<typeof ensureRemoteAgent> = (
options,
) =>
broadcast?: BrowseProgressBroadcast,
bootstrap: (
options: EnsureRemoteAgentOptions,
onProgress?: (event: LspBootstrapProgressEvent) => void,
) => ReturnType<typeof ensureRemoteAgent> = (options, onProgress) =>
// The promptHandler MUST be forwarded to ensureRemoteAgent — without it
// createBootstrapContext skips interactive auth and password-only hosts
// fail before the agent channel is ever opened.
ensureRemoteAgent(options, { promptHandler }),
ensureRemoteAgent(options, { promptHandler, onProgress }),
): (args: unknown) => Promise<{ sessionId: string; initialPath: string; user: string }> {
return async (
args: unknown,
Expand All @@ -155,6 +168,23 @@ export function openBrowseSessionHandler(
// name so the connection (and the saved profile) has a concrete user.
const user = resolveSshUser(params.user);

// Stream agent-bootstrap progress to the renderer for this connect attempt.
// Keyed by the caller's progressId so the "add workspace" dialog can show
// the same upload/verify progress that registered workspaces already get,
// even though no workspaceId exists yet. No progressId or broadcast → no-op.
const onProgress =
params.progressId && broadcast
? (event: LspBootstrapProgressEvent): void => {
broadcast("ssh", "browseProgress", {
progressId: params.progressId,
name: event.name,
phase: event.phase,
bytesDone: event.bytesDone,
bytesTotal: event.bytesTotal,
});
}
: undefined;

const bootstrapOptions: EnsureRemoteAgentOptions = {
host: params.host,
user,
Expand All @@ -181,7 +211,7 @@ export function openBrowseSessionHandler(
let bootstrapResult: Awaited<ReturnType<typeof ensureRemoteAgent>> | null = null;
let channel: ReturnType<typeof createSshChannel> | null = null;
try {
bootstrapResult = await bootstrap(bootstrapOptions);
bootstrapResult = await bootstrap(bootstrapOptions, onProgress);

if (timedOut) {
bootstrapResult.dispose?.();
Expand Down Expand Up @@ -256,9 +286,13 @@ export function openBrowseSessionHandler(
export function openBrowseSessionResultHandler(
registry: SshBrowseSessionRegistry,
promptHandler: SshAuthPromptHandler,
bootstrap?: (options: EnsureRemoteAgentOptions) => ReturnType<typeof ensureRemoteAgent>,
broadcast?: BrowseProgressBroadcast,
bootstrap?: (
options: EnsureRemoteAgentOptions,
onProgress?: (event: LspBootstrapProgressEvent) => void,
) => ReturnType<typeof ensureRemoteAgent>,
): (args: unknown) => Promise<ReturnType<typeof ipcOk> | ReturnType<typeof ipcErr>> {
const inner = openBrowseSessionHandler(registry, promptHandler, bootstrap);
const inner = openBrowseSessionHandler(registry, promptHandler, broadcast, bootstrap);
return async (args: unknown) => {
try {
const result = await inner(args);
Expand Down
Loading