From 031e9dda071dbc13de01757cc1b83744bc932138 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 03:09:14 +0000 Subject: [PATCH 01/19] feat: add PTY mode for services --- Cargo.toml | 2 +- .../content/docs/tutorials/web-terminal.mdx | 312 ++++++++++++ src/nu/config.rs | 24 +- src/nu/mod.rs | 2 +- src/processor/service/service.rs | 452 +++++++++++++++++- 5 files changed, 763 insertions(+), 29 deletions(-) create mode 100644 docs/src/content/docs/tutorials/web-terminal.mdx diff --git a/Cargo.toml b/Cargo.toml index 802117f88..dcd80adf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ rand = "0.8" data-encoding = "2.6" [target.'cfg(unix)'.dependencies] -nix = { version = "0.29", default-features = false, features = ["poll", "fs"] } +nix = { version = "0.29", default-features = false, features = ["poll", "fs", "process", "signal", "term"] } [target.'cfg(windows)'.dependencies] win_uds = { version = "=0.2.1", features = ["async"] } diff --git a/docs/src/content/docs/tutorials/web-terminal.mdx b/docs/src/content/docs/tutorials/web-terminal.mdx new file mode 100644 index 000000000..fc8512c35 --- /dev/null +++ b/docs/src/content/docs/tutorials/web-terminal.mdx @@ -0,0 +1,312 @@ +--- +title: Web Terminal +description: Connect ghostty-web to an xs-managed PTY session over HTTP, using http-nu as the bridge. +sidebar: + order: 5 +--- + +import { Aside } from '@astrojs/starlight/components'; +import { Link } from '../../../utils/links'; + +This tutorial wires together three pieces to put a real shell in the browser: + +- **xs** allocates and manages a pseudo-terminal via its PTY service +- **http-nu** exposes the PTY over four HTTP endpoints +- **ghostty-web** renders the terminal in a canvas element using Ghostty's VT100 engine compiled to WASM + +Here is the full picture: + +```mermaid +graph LR + Browser["browser
ghostty-web
(WASM + Canvas)"] + + subgraph server + HttpNu["http-nu"] + XS["xs store"] + PTY["PTY service
(fork + openpty)"] + Nu["nu"] + end + + Browser -- "POST /pty/create
{cols, rows}" --> HttpNu + HttpNu -- ".append pty.sid.spawn
pty: {cmd: nu}" --> XS + XS -- "spawn service" --> PTY + PTY -- "exec" --> Nu + + Nu -- "stdout" --> PTY + PTY -- ".recv frames" --> XS + XS -- ".cat --follow" --> HttpNu + HttpNu -- "GET /pty/stream
SSE base64" --> Browser + + Browser -- "POST /pty/input
raw text" --> HttpNu + HttpNu -- ".append pty.sid.send" --> XS + XS -- "send frames" --> PTY + PTY -- "stdin" --> Nu + + Browser -- "POST /pty/resize
{cols, rows}" --> HttpNu + HttpNu -- ".append pty.sid.resize" --> XS + XS -- "ioctl TIOCSWINSZ
+ SIGWINCH" --> PTY +``` + +The rest of this tutorial builds each layer from the inside out: PTY, then +HTTP, then browser. + +## Prerequisites + +- xs installed and on your PATH (see Installation) +- http-nu installed with `--store` support +- ghostty-web built (`bun run build`) with dist assets available +- A terminal running with `use xs.nu *` + +## Start a store + +```bash withOutput +xs serve ./store +``` + +Leave this running. + +## The PTY service + +xs services normally require a `run` closure. PTY mode is different: you provide +a `pty` record instead of a closure, and xs handles the rest. + +```nushell +r#'{ + pty: {cmd: "/usr/local/bin/nu", cols: 131, rows: 50} +}'# | .append my-shell.spawn +``` + +When xs sees a `pty` field in the spawn config, it: + +1. Calls `openpty` to allocate a pseudo-terminal with the given size +2. Forks the command into the slave side of the PTY +3. Reads the master fd and emits raw bytes as `.recv` frames (content stored in CAS) +4. Watches for `.send` frames and writes their CAS content to the master fd +5. Watches for `.resize` frames and applies `cols`/`rows` from frame metadata via `ioctl TIOCSWINSZ`, then signals the child with `SIGWINCH` + +No `run` closure needed. No `duplex` flag. PTY mode is inherently bidirectional. + +```mermaid +flowchart LR + subgraph xs + master[master fd] + recv[".recv frames"] + send[".send frames"] + resize[".resize frames"] + end + subgraph PTY + shell[nu] + slave[slave fd] + end + master -- "read" --> recv + send -- "write" --> master + resize -- "ioctl + SIGWINCH" --> master + master <--> slave + slave <--> shell +``` + +The full lifecycle (`running`, `stopped`, `shutdown`, auto-restart, hot reload, +terminate) works the same as closure-based services. + +## The http-nu handler + +The handler script bridges four HTTP endpoints to the PTY service topics. +Save this as `serve.nu`: + +```nushell +use http-nu/router * + +{|req| + dispatch $req [ + + # Create a new PTY session + (route {method: "POST", path: "/pty/create"} {|req ctx| + let body = ($in | from json) + let cols = ($body.cols? | default 80) + let rows = ($body.rows? | default 24) + let sid = (random uuid) + + # Spawn the PTY service + $"{ pty: {cmd: \"/usr/local/bin/nu\", cols: ($cols), rows: ($rows)} }" + | .append $"pty.($sid).spawn" + + {sid: $sid} | to json + }) + + # Stream PTY output as SSE + (route {method: "GET", path: "/pty/stream"} {|req ctx| + let sid = $req.query.sid + .head $"pty.($sid).recv" --follow + | each {|frame| + let bytes = (.cas $frame.hash) + {data: ($bytes | encode base64)} + } + | to sse + }) + + # Send keyboard input to the PTY + (route {method: "POST", path: "/pty/input"} {|req ctx| + let sid = $req.query.sid + $in | .append $"pty.($sid).send" + null | metadata set --merge {'http.response': {status: 204}} + }) + + # Resize the PTY + (route {method: "POST", path: "/pty/resize"} {|req ctx| + let body = ($in | from json) + let sid = $req.query.sid + .append $"pty.($sid).resize" --meta {cols: $body.cols, rows: $body.rows} + null | metadata set --merge {'http.response': {status: 204}} + }) + ] +} +``` + +Each endpoint maps to a PTY service topic: + +| Endpoint | Store topic | Direction | +| --- | --- | --- | +| `POST /pty/create` | `pty..spawn` | client -> xs | +| `GET /pty/stream` | `pty..recv` | xs -> client (SSE) | +| `POST /pty/input` | `pty..send` | client -> xs | +| `POST /pty/resize` | `pty..resize` | client -> xs | + +The stream endpoint uses `.head --follow` to tail new recv frames as +they arrive, base64-encodes the CAS content, and pipes through `to sse`. The +client receives a standard `text/event-stream` where each `data:` line carries +base64-encoded terminal output. + +## The client + +ghostty-web compiles Ghostty's VT100 parser and renderer to a 416KB WASM binary. +TypeScript handles canvas rendering, keyboard input, and clipboard. The terminal +itself is transport-agnostic: `term.write()` pushes bytes in, `term.onData()` +captures keystrokes out. + +The demo client (`sse-client.html`) connects the four endpoints: + +```javascript +// 1. Create session +const { sid } = await fetch('/pty/create', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ cols: term.cols, rows: term.rows }), +}).then(r => r.json()); + +// 2. Stream output via SSE +const source = new EventSource('/pty/stream?sid=' + sid); +source.onmessage = (e) => { + const bytes = Uint8Array.from(atob(e.data), c => c.charCodeAt(0)); + term.write(bytes); +}; + +// 3. Send keystrokes via POST +term.onData((data) => { + fetch('/pty/input?sid=' + sid, { method: 'POST', body: data }); +}); + +// 4. Send resize via POST +term.onResize(({ cols, rows }) => { + fetch('/pty/resize?sid=' + sid, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ cols, rows }), + }); +}); +``` + +```mermaid +sequenceDiagram + participant B as Browser + participant H as http-nu + participant S as xs store + participant P as PTY + + B->>H: POST /pty/create + H->>S: .append pty.sid.spawn + S->>P: openpty + fork + H-->>B: {sid} + + B->>H: GET /pty/stream?sid= + H->>S: .head pty.sid.recv --follow + + P->>S: stdout --> .recv frame + S->>H: frame + H-->>B: SSE data: base64 + + B->>H: POST /pty/input + H->>S: .append pty.sid.send + S->>P: .send --> stdin + + B->>H: POST /pty/resize + H->>S: .append pty.sid.resize + S->>P: ioctl + SIGWINCH +``` + +SSE downstream, POST upstream. No WebSocket upgrade required. Works through +HTTP proxies and tunnels. + +## Try it + +Start everything in three terminals. + +Terminal 1 -- the store (already running from above): + +```bash withOutput +xs serve ./store +``` + +Terminal 2 -- http-nu, pointing at the store and serving ghostty-web assets: + +```bash withOutput +http-nu :3001 --store ./store ./serve.nu +``` + +Terminal 3 -- open the browser: + +```bash withOutput +open http://localhost:3001 +``` + + + +You should see a nushell prompt rendered in the browser. Type commands, watch +output stream back. Every keystroke flows through xs as a `pty..send` +frame; every chunk of terminal output flows back as a `pty..recv` frame. + +## Resize + +Resize the browser window. ghostty-web's `FitAddon` recalculates cols and rows, +fires `term.onResize`, and the client POSTs to `/pty/resize`. http-nu appends a +`pty..resize` frame with `{cols, rows}` in metadata. xs applies the new +dimensions via `ioctl TIOCSWINSZ` and sends `SIGWINCH` to the child process. +The shell redraws to fit. + +## Terminate + +End a PTY session from the store side: + +```nushell +.append pty..terminate +``` + +xs kills the child process (SIGTERM, then SIGKILL if needed), emits +`pty..stopped` with `meta.reason` set to `terminate`, then +`pty..shutdown`. The SSE stream ends and ghostty-web shows a session-ended +message. + +## Recap + +| Component | Role | +| --- | --- | +| xs | Allocates the PTY, manages the child process, streams I/O as frames | +| http-nu | Bridges HTTP endpoints to store topics | +| ghostty-web | Renders VT100 output in a canvas, captures keyboard input | + +The PTY service handles the low-level terminal plumbing. http-nu translates +HTTP to frames. ghostty-web handles rendering. Each piece does one thing. diff --git a/src/nu/config.rs b/src/nu/config.rs index 1740f7f9b..240053473 100644 --- a/src/nu/config.rs +++ b/src/nu/config.rs @@ -46,12 +46,13 @@ pub struct ReturnOptions { pub target: Option, } -/// Parse a script into a NuScriptConfig struct. +/// Evaluate a nushell script and return the resulting Value. /// -/// Parses and evaluates the script, then extracts the `run` closure and the full -/// configuration value. VFS modules (registered via `*.nu` topics) are already -/// available on the engine state before this function is called. -pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result { +/// Handles parsing, error reporting, and evaluation. Does not extract any +/// specific fields from the result. Useful when the caller needs to inspect +/// the config value before deciding how to use it (e.g. checking for a `pty` +/// field vs a `run` closure). +pub fn eval_script(engine: &mut crate::nu::Engine, script: &str) -> Result { let mut working_set = StateWorkingSet::new(&engine.state); let block = parse(&mut working_set, None, script.as_bytes(), false); @@ -107,6 +108,17 @@ pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result Result { + let config_value = eval_script(engine, script)?; let run_val = config_value .get_data_by_key("run") @@ -115,8 +127,6 @@ pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result Error { format!("'run' field must be a closure: {e}").into() })?; - engine.state.merge_env(&mut stack)?; - Ok(NuScriptConfig { run_closure: run_closure.clone(), full_config_value: config_value, diff --git a/src/nu/mod.rs b/src/nu/mod.rs index 505c002d8..fae28a07b 100644 --- a/src/nu/mod.rs +++ b/src/nu/mod.rs @@ -4,7 +4,7 @@ pub mod vfs; pub mod commands; pub mod util; -pub use config::{parse_config, NuScriptConfig, ReturnOptions}; +pub use config::{eval_script, parse_config, NuScriptConfig, ReturnOptions}; pub use engine::{add_core_commands, Engine}; pub use util::{frame_to_pipeline, frame_to_value, value_to_json}; pub use vfs::load_modules; diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index 436a05252..ef7960586 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -13,10 +13,18 @@ use crate::nu::{value_to_json, ReturnOptions}; use crate::store::{FollowOption, Frame, ReadOptions, Store}; use serde_json::json; +#[derive(Clone, Debug, serde::Deserialize, Default)] +pub struct PtyOptions { + pub cmd: String, + pub cols: Option, + pub rows: Option, +} + #[derive(Clone, Debug, serde::Deserialize, Default)] pub struct ServiceScriptOptions { pub duplex: Option, pub return_options: Option, + pub pty: Option, } #[derive(Clone)] @@ -166,35 +174,95 @@ pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> { tokio::spawn(async move { run(store, spawn_frame).await }) } +async fn read_spawn_script(store: &Store, spawn_frame: &Frame) -> Option { + let hash = spawn_frame.hash.clone()?; + let mut reader = store.cas_reader(hash).await.ok()?; + let mut script = String::new(); + reader.read_to_string(&mut script).await.ok()?; + Some(script) +} + +fn make_loop_ctx(spawn_frame: &Frame) -> ServiceLoop { + ServiceLoop { + topic: spawn_frame + .topic + .strip_suffix(".spawn") + .unwrap_or(&spawn_frame.topic) + .to_string(), + } +} + async fn run(store: Store, spawn_frame: Frame) { + let script = match read_spawn_script(&store, &spawn_frame).await { + Some(s) => s, + None => return, + }; + + let loop_ctx = make_loop_ctx(&spawn_frame); + + // Evaluate the script to get the config value. Use eval_script so we can + // inspect the value before deciding whether this is a PTY or closure service. let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) { Ok(e) => e, Err(_) => return, }; - let hash = match spawn_frame.hash.clone() { - Some(h) => h, - None => return, + let config_value = match nu::eval_script(&mut engine, &script) { + Ok(v) => v, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: e.to_string(), + }, + ); + return; + } }; - let mut reader = match store.cas_reader(hash).await { - Ok(r) => r, - Err(_) => return, + + let opts: ServiceScriptOptions = match serde_json::from_value(nu::value_to_json(&config_value)) + { + Ok(o) => o, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: e.to_string(), + }, + ); + return; + } }; - let mut script = String::new(); - if reader.read_to_string(&mut script).await.is_err() { + + if let Some(pty_opts) = opts.pty { + run_pty_loop(store, loop_ctx, spawn_frame.id, pty_opts).await; return; } - let loop_ctx = ServiceLoop { - topic: spawn_frame - .topic - .strip_suffix(".spawn") - .unwrap_or(&spawn_frame.topic) - .to_string(), + // Closure-based service: extract the run closure + let run_val = match config_value.get_data_by_key("run") { + Some(v) => v, + None => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: "Script must define a 'run' closure or 'pty' options.".into(), + }, + ); + return; + } }; - - let nu_config = match nu::parse_config(&mut engine, &script) { - Ok(cfg) => cfg, + let run_closure = match run_val.as_closure() { + Ok(c) => c.clone(), Err(e) => { let _ = emit_event( &store, @@ -202,13 +270,12 @@ async fn run(store: Store, spawn_frame: Frame) { spawn_frame.id, None, ServiceEventKind::ParseError { - message: e.to_string(), + message: format!("'run' field must be a closure: {e}"), }, ); return; } }; - let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default(); // Create and set the interrupt signal on the engine state let interrupt = Arc::new(AtomicBool::new(false)); @@ -216,7 +283,7 @@ async fn run(store: Store, spawn_frame: Frame) { let task = Task { id: spawn_frame.id, - run_closure: nu_config.run_closure, + run_closure, return_options: opts.return_options, duplex: opts.duplex.unwrap_or(false), engine, @@ -422,6 +489,351 @@ async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) { } } +#[cfg(unix)] +async fn run_pty_loop( + store: Store, + loop_ctx: ServiceLoop, + source_id: Scru128Id, + initial_pty_opts: PtyOptions, +) { + use nix::libc; + use nix::pty::openpty; + use nix::sys::signal::{kill, Signal}; + use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; + use nix::unistd::{close, dup2, execvp, fork, setsid, ForkResult, Pid}; + use std::ffi::CString; + use std::os::unix::io::{FromRawFd, IntoRawFd}; + + fn spawn_pty_child(opts: &PtyOptions) -> Result<(std::os::unix::io::RawFd, Pid), String> { + let cols = opts.cols.unwrap_or(80); + let rows = opts.rows.unwrap_or(24); + + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + let pty = openpty(Some(&ws), None).map_err(|e| format!("openpty: {e}"))?; + + // Convert OwnedFd to raw fds before fork. After fork both processes + // need to close fds manually, so raw fds are simpler and safer. + let master_fd = pty.master.into_raw_fd(); + let slave_fd = pty.slave.into_raw_fd(); + + match unsafe { fork() } { + Ok(ForkResult::Child) => { + let _ = close(master_fd); + let _ = setsid(); + + // Set controlling terminal + unsafe { + libc::ioctl(slave_fd, libc::TIOCSCTTY as _, 0); + } + + let _ = dup2(slave_fd, 0); + let _ = dup2(slave_fd, 1); + let _ = dup2(slave_fd, 2); + if slave_fd > 2 { + let _ = close(slave_fd); + } + + let cmd = + CString::new(opts.cmd.as_str()).unwrap_or_else(|_| CString::new("sh").unwrap()); + let args = [cmd.clone()]; + let _ = execvp(&cmd, &args); + std::process::exit(1); + } + Ok(ForkResult::Parent { child }) => { + let _ = close(slave_fd); + Ok((master_fd, child)) + } + Err(e) => Err(format!("fork: {e}")), + } + } + + fn kill_child(pid: Pid) { + let _ = kill(pid, Signal::SIGTERM); + // Give the child a moment to exit, then force kill + std::thread::sleep(std::time::Duration::from_millis(50)); + if let Ok(WaitStatus::StillAlive) = waitpid(pid, Some(WaitPidFlag::WNOHANG)) { + let _ = kill(pid, Signal::SIGKILL); + let _ = waitpid(pid, None); + } + } + + let start_event = emit_event( + &store, + &loop_ctx, + source_id, + None, + ServiceEventKind::Running, + ) + .expect("failed to emit running event"); + let mut start_id = start_event.frame.id; + + let control_rx_options = ReadOptions::builder() + .follow(FollowOption::On) + .after(start_id) + .build(); + let mut control_rx = store.read(control_rx_options).await; + + let mut current_id = source_id; + let mut pty_opts = initial_pty_opts; + + loop { + let (master_fd, child_pid) = match spawn_pty_child(&pty_opts) { + Ok(v) => v, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Stopped(StopReason::Error { message: e }), + ); + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Shutdown, + ); + return; + } + }; + + // Safety: we own master_fd after fork, the child closed its copy + let master_file = unsafe { std::fs::File::from_raw_fd(master_fd) }; + let master_read = tokio::fs::File::from_std(master_file.try_clone().unwrap()); + let master_write = tokio::fs::File::from_std(master_file); + + // Spawn task: read master fd -> emit recv frames + let recv_store = store.clone(); + let recv_ctx = loop_ctx.clone(); + let recv_id = current_id; + let (child_done_tx, child_done_rx) = tokio::sync::oneshot::channel::<()>(); + let recv_handle = tokio::spawn(async move { + let mut reader = master_read; + let mut buf = [0u8; 8192]; + loop { + match reader.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + let _ = emit_event( + &recv_store, + &recv_ctx, + recv_id, + None, + ServiceEventKind::Recv { + suffix: "recv".into(), + data: buf[..n].to_vec(), + }, + ); + } + Err(_) => break, + } + } + let _ = child_done_tx.send(()); + }); + + // Spawn task: read send frames -> write to master fd + let send_store = store.clone(); + let send_topic = format!("{}.send", loop_ctx.topic); + let send_options = ReadOptions::builder() + .follow(FollowOption::On) + .after(start_id) + .build(); + let send_rx = send_store.read(send_options).await; + + use tokio::io::AsyncWriteExt; + let send_handle = tokio::spawn(async move { + let mut writer = master_write; + let mut rx = send_rx; + while let Some(frame) = rx.recv().await { + if frame.topic != send_topic { + continue; + } + if let Some(hash) = frame.hash { + if let Ok(bytes) = send_store.cas_read(&hash).await { + if writer.write_all(&bytes).await.is_err() { + break; + } + if writer.flush().await.is_err() { + break; + } + } + } + } + }); + + let terminate_topic = format!("{}.terminate", loop_ctx.topic); + let spawn_topic = format!("{}.spawn", loop_ctx.topic); + let resize_topic = format!("{}.resize", loop_ctx.topic); + tokio::pin!(child_done_rx); + + enum PtyOutcome { + ChildExited, + Terminate, + Shutdown, + Update(Scru128Id, PtyOptions), + Error(String), + } + + let outcome = 'pty_ctrl: loop { + tokio::select! { + biased; + maybe = control_rx.recv() => { + match maybe { + Some(frame) if frame.topic == terminate_topic => { + kill_child(child_pid); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Terminate; + } + Some(frame) if frame.topic == "xs.stopping" => { + kill_child(child_pid); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Shutdown; + } + Some(frame) if frame.topic == spawn_topic => { + if let Some(hash) = frame.hash.clone() { + if let Ok(bytes) = store.cas_read(&hash).await { + if let Ok(script) = String::from_utf8(bytes) { + let mut new_engine = match crate::processor::build_engine(&store, &frame.id) { + Ok(e) => e, + Err(_) => continue, + }; + match nu::eval_script(&mut new_engine, &script) { + Ok(val) => { + let new_opts: ServiceScriptOptions = + match serde_json::from_value(nu::value_to_json(&val)) { + Ok(o) => o, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { message: e.to_string() }, + ); + continue; + } + }; + if let Some(new_pty) = new_opts.pty { + kill_child(child_pid); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Update(frame.id, new_pty); + } + // New config is not PTY -- treat as terminate + // so serve.rs can restart with the new config type. + // This is an edge case (switching PTY -> closure). + kill_child(child_pid); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Terminate; + } + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { message: e.to_string() }, + ); + } + } + } + } + } + } + Some(frame) if frame.topic == resize_topic => { + if let Some(ref meta) = frame.meta { + let cols = meta.get("cols").and_then(|v| v.as_u64()).unwrap_or(80) as u16; + let rows = meta.get("rows").and_then(|v| v.as_u64()).unwrap_or(24) as u16; + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + unsafe { + libc::ioctl(master_fd, libc::TIOCSWINSZ as libc::c_ulong, &ws); + } + let _ = kill(child_pid, Signal::SIGWINCH); + } + } + Some(_) => {} + None => break 'pty_ctrl PtyOutcome::Error("control channel closed".into()), + } + } + _ = &mut child_done_rx => { + break 'pty_ctrl PtyOutcome::ChildExited; + } + } + }; + + recv_handle.abort(); + send_handle.abort(); + + let stop_reason = match &outcome { + PtyOutcome::ChildExited => StopReason::Finished, + PtyOutcome::Terminate => StopReason::Terminate, + PtyOutcome::Shutdown => StopReason::Shutdown, + PtyOutcome::Update(update_id, _) => StopReason::Update { + update_id: *update_id, + }, + PtyOutcome::Error(e) => StopReason::Error { message: e.clone() }, + }; + + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Stopped(stop_reason), + ); + + match outcome { + PtyOutcome::ChildExited => { + // Restart after delay, like closure services + tokio::time::sleep(Duration::from_secs(1)).await; + if let Ok(event) = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Running, + ) { + start_id = event.frame.id; + } + } + PtyOutcome::Update(new_id, new_pty) => { + current_id = new_id; + pty_opts = new_pty; + if let Ok(event) = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Running, + ) { + start_id = event.frame.id; + } + } + PtyOutcome::Terminate | PtyOutcome::Shutdown | PtyOutcome::Error(_) => { + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Shutdown, + ); + break; + } + } + } +} + async fn build_input_pipeline( store: Store, loop_ctx: &ServiceLoop, From d04dc02f2309b92f07d426049672b873598ecd05 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 14:27:21 +0000 Subject: [PATCH 02/19] fix: use portable ioctl request type for TIOCSWINSZ The Ioctl type is platform-dependent (i32 on musl, c_ulong on glibc). Let the compiler infer the correct type instead of forcing c_ulong. --- src/processor/service/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index ef7960586..2898daae0 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -757,7 +757,7 @@ async fn run_pty_loop( ws_ypixel: 0, }; unsafe { - libc::ioctl(master_fd, libc::TIOCSWINSZ as libc::c_ulong, &ws); + libc::ioctl(master_fd, libc::TIOCSWINSZ as _, &ws); } let _ = kill(child_pid, Signal::SIGWINCH); } From 793f7cb1850a33a24bd3a48ce4bafdd73d6099c7 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 15:56:48 +0000 Subject: [PATCH 03/19] refactor: cross-platform PTY with new API PTY config is now a flag alongside run instead of a standalone mode: { run: "/usr/local/bin/nu", pty: {cols: 80, rows: 24} } Extract platform-specific code into pty.rs module with Unix (openpty/fork) and Windows (ConPTY) implementations behind a shared PtyChild interface. Add cross-platform test. --- Cargo.lock | 1 + Cargo.toml | 1 + .../content/docs/tutorials/web-terminal.mdx | 63 ++-- src/processor/service/mod.rs | 1 + src/processor/service/pty.rs | 293 ++++++++++++++++++ src/processor/service/service.rs | 177 +++++------ src/processor/service/tests.rs | 92 ++++++ 7 files changed, 476 insertions(+), 152 deletions(-) create mode 100644 src/processor/service/pty.rs diff --git a/Cargo.lock b/Cargo.lock index d187de97e..fabdb5ca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1093,6 +1093,7 @@ dependencies = [ "url", "webpki-roots 0.26.11", "win_uds", + "windows 0.62.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index dcd80adf5..6498b55e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ nix = { version = "0.29", default-features = false, features = ["poll", "fs", "p [target.'cfg(windows)'.dependencies] win_uds = { version = "=0.2.1", features = ["async"] } +windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } [dev-dependencies] assert_cmd = "2.0.14" diff --git a/docs/src/content/docs/tutorials/web-terminal.mdx b/docs/src/content/docs/tutorials/web-terminal.mdx index fc8512c35..3af123cdf 100644 --- a/docs/src/content/docs/tutorials/web-terminal.mdx +++ b/docs/src/content/docs/tutorials/web-terminal.mdx @@ -67,42 +67,43 @@ Leave this running. ## The PTY service -xs services normally require a `run` closure. PTY mode is different: you provide -a `pty` record instead of a closure, and xs handles the rest. +A normal xs service uses `run` with a closure. For PTY mode, `run` is a command +string and `pty` provides the terminal dimensions: ```nushell r#'{ - pty: {cmd: "/usr/local/bin/nu", cols: 131, rows: 50} + run: "/usr/local/bin/nu" + pty: {cols: 131, rows: 50} }'# | .append my-shell.spawn ``` When xs sees a `pty` field in the spawn config, it: -1. Calls `openpty` to allocate a pseudo-terminal with the given size -2. Forks the command into the slave side of the PTY -3. Reads the master fd and emits raw bytes as `.recv` frames (content stored in CAS) -4. Watches for `.send` frames and writes their CAS content to the master fd -5. Watches for `.resize` frames and applies `cols`/`rows` from frame metadata via `ioctl TIOCSWINSZ`, then signals the child with `SIGWINCH` +1. Allocates a pseudo-terminal with the given size (openpty on Unix, ConPTY on Windows) +2. Spawns the `run` command inside the PTY +3. Reads the primary fd and emits raw bytes as `.recv` frames (content stored in CAS) +4. Watches for `.send` frames and writes their CAS content to the primary fd +5. Watches for `.resize` frames and applies `cols`/`rows` from frame metadata -No `run` closure needed. No `duplex` flag. PTY mode is inherently bidirectional. +PTY mode is inherently bidirectional. No `duplex` flag needed. ```mermaid flowchart LR subgraph xs - master[master fd] + primary[primary fd] recv[".recv frames"] send[".send frames"] resize[".resize frames"] end subgraph PTY shell[nu] - slave[slave fd] + secondary[secondary fd] end - master -- "read" --> recv - send -- "write" --> master - resize -- "ioctl + SIGWINCH" --> master - master <--> slave - slave <--> shell + primary -- "read" --> recv + send -- "write" --> primary + resize -- "resize" --> primary + primary <--> secondary + secondary <--> shell ``` The full lifecycle (`running`, `stopped`, `shutdown`, auto-restart, hot reload, @@ -127,7 +128,7 @@ use http-nu/router * let sid = (random uuid) # Spawn the PTY service - $"{ pty: {cmd: \"/usr/local/bin/nu\", cols: ($cols), rows: ($rows)} }" + $"{ run: \"/usr/local/bin/nu\", pty: {cols: ($cols), rows: ($rows)} }" | .append $"pty.($sid).spawn" {sid: $sid} | to json @@ -215,34 +216,6 @@ term.onResize(({ cols, rows }) => { }); ``` -```mermaid -sequenceDiagram - participant B as Browser - participant H as http-nu - participant S as xs store - participant P as PTY - - B->>H: POST /pty/create - H->>S: .append pty.sid.spawn - S->>P: openpty + fork - H-->>B: {sid} - - B->>H: GET /pty/stream?sid= - H->>S: .head pty.sid.recv --follow - - P->>S: stdout --> .recv frame - S->>H: frame - H-->>B: SSE data: base64 - - B->>H: POST /pty/input - H->>S: .append pty.sid.send - S->>P: .send --> stdin - - B->>H: POST /pty/resize - H->>S: .append pty.sid.resize - S->>P: ioctl + SIGWINCH -``` - SSE downstream, POST upstream. No WebSocket upgrade required. Works through HTTP proxies and tunnels. diff --git a/src/processor/service/mod.rs b/src/processor/service/mod.rs index e68cb2182..fcf2406af 100644 --- a/src/processor/service/mod.rs +++ b/src/processor/service/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod pty; mod serve; #[allow(clippy::module_inception)] pub(crate) mod service; diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs new file mode 100644 index 000000000..19858a5b3 --- /dev/null +++ b/src/processor/service/pty.rs @@ -0,0 +1,293 @@ +//! Cross-platform pseudo-terminal child process. +//! +//! Unix: openpty + fork. Windows: ConPTY. + +/// A child process running inside a pseudo-terminal. +/// +/// `reader` carries terminal output (VT sequences). +/// `writer` accepts input (keystrokes). +/// `handle` controls the PTY lifecycle (resize, kill). +pub struct PtyChild { + pub reader: std::fs::File, + pub writer: std::fs::File, + pub handle: PtyHandle, +} + +impl PtyChild { + /// Spawn `cmd` inside a new PTY with the given dimensions. + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + platform::spawn(cmd, cols, rows) + } +} + +impl PtyHandle { + /// Resize the PTY. + pub fn resize(&self, cols: u16, rows: u16) -> Result<(), String> { + platform::resize(self, cols, rows) + } + + /// Kill the child process. + pub fn kill(&mut self) { + platform::kill(self); + } +} + +// ---- Unix ---- + +#[cfg(unix)] +pub struct PtyHandle { + /// Raw fd for ioctl. Valid as long as reader/writer are alive. + primary_fd: std::os::unix::io::RawFd, + pid: nix::unistd::Pid, +} + +#[cfg(unix)] +mod platform { + use super::*; + use nix::libc; + use nix::pty::openpty; + use nix::sys::signal::Signal; + use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; + use nix::unistd::{close, dup2, execvp, fork, setsid, ForkResult}; + use std::ffi::CString; + use std::os::unix::io::{FromRawFd, IntoRawFd}; + + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + let pty = openpty(Some(&ws), None).map_err(|e| format!("openpty: {e}"))?; + let primary_fd = pty.master.into_raw_fd(); + let secondary_fd = pty.slave.into_raw_fd(); + + match unsafe { fork() } { + Ok(ForkResult::Child) => { + let _ = close(primary_fd); + let _ = setsid(); + + // Set controlling terminal + unsafe { + libc::ioctl(secondary_fd, libc::TIOCSCTTY as _, 0); + } + + let _ = dup2(secondary_fd, 0); + let _ = dup2(secondary_fd, 1); + let _ = dup2(secondary_fd, 2); + if secondary_fd > 2 { + let _ = close(secondary_fd); + } + + let c_cmd = CString::new(cmd).unwrap_or_else(|_| CString::new("sh").unwrap()); + let args = [c_cmd.clone()]; + let _ = execvp(&c_cmd, &args); + std::process::exit(1); + } + Ok(ForkResult::Parent { child }) => { + let _ = close(secondary_fd); + let primary_file = unsafe { std::fs::File::from_raw_fd(primary_fd) }; + let reader = primary_file + .try_clone() + .map_err(|e| format!("clone fd: {e}"))?; + Ok(PtyChild { + reader, + writer: primary_file, + handle: PtyHandle { + primary_fd, + pid: child, + }, + }) + } + Err(e) => Err(format!("fork: {e}")), + } + } + + pub fn resize(handle: &PtyHandle, cols: u16, rows: u16) -> Result<(), String> { + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + unsafe { + libc::ioctl(handle.primary_fd, libc::TIOCSWINSZ as _, &ws); + } + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGWINCH); + Ok(()) + } + + pub fn kill(handle: &mut PtyHandle) { + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGTERM); + std::thread::sleep(std::time::Duration::from_millis(50)); + if let Ok(WaitStatus::StillAlive) = waitpid(handle.pid, Some(WaitPidFlag::WNOHANG)) { + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGKILL); + let _ = waitpid(handle.pid, None); + } + } +} + +// ---- Windows ---- + +#[cfg(windows)] +pub struct PtyHandle { + hpcon: windows::Win32::System::Console::HPCON, + process_handle: windows::Win32::Foundation::HANDLE, + thread_handle: windows::Win32::Foundation::HANDLE, + closed: bool, +} + +#[cfg(windows)] +mod platform { + use super::*; + use std::ffi::c_void; + use std::mem; + use std::os::windows::io::{FromRawHandle, RawHandle}; + use windows::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}; + use windows::Win32::System::Console::{ + ClosePseudoConsole, CreatePseudoConsole, ResizePseudoConsole, COORD, HPCON, + }; + use windows::Win32::System::Pipes::CreatePipe; + use windows::Win32::System::Threading::{ + CreateProcessW, DeleteProcThreadAttributeList, InitializeProcThreadAttributeList, + TerminateProcess, UpdateProcThreadAttribute, EXTENDED_STARTUPINFO_PRESENT, + LPPROC_THREAD_ATTRIBUTE_LIST, PROCESS_INFORMATION, PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE, + STARTUPINFOEXW, STARTUPINFOW, + }; + + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + unsafe { spawn_inner(cmd, cols, rows) } + } + + unsafe fn spawn_inner(cmd: &str, cols: u16, rows: u16) -> Result { + // Create two pipe pairs: input and output + let mut input_read = INVALID_HANDLE_VALUE; + let mut input_write = INVALID_HANDLE_VALUE; + let mut output_read = INVALID_HANDLE_VALUE; + let mut output_write = INVALID_HANDLE_VALUE; + + CreatePipe(&mut input_read, &mut input_write, None, 0) + .map_err(|e| format!("CreatePipe (input): {e}"))?; + CreatePipe(&mut output_read, &mut output_write, None, 0) + .map_err(|e| format!("CreatePipe (output): {e}"))?; + + let size = COORD { + X: cols as i16, + Y: rows as i16, + }; + + // ConPTY reads from input_read, writes to output_write + let hpcon = CreatePseudoConsole(size, input_read, output_write, 0) + .map_err(|e| format!("CreatePseudoConsole: {e}"))?; + + // Close the pipe ends now owned by the ConPTY + let _ = CloseHandle(input_read); + let _ = CloseHandle(output_write); + + // Set up process thread attribute list + let mut attr_size: usize = 0; + let _ = InitializeProcThreadAttributeList(None, 1, None, &mut attr_size); + + let mut attr_buf = vec![0u8; attr_size]; + let attr_list = LPPROC_THREAD_ATTRIBUTE_LIST(attr_buf.as_mut_ptr() as *mut _); + + InitializeProcThreadAttributeList(Some(attr_list), 1, None, &mut attr_size) + .map_err(|e| format!("InitializeProcThreadAttributeList: {e}"))?; + + UpdateProcThreadAttribute( + attr_list, + 0, + PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE as usize, + Some(&hpcon as *const HPCON as *const c_void), + mem::size_of::(), + None, + None, + ) + .map_err(|e| format!("UpdateProcThreadAttribute: {e}"))?; + + let startup_info = STARTUPINFOEXW { + StartupInfo: STARTUPINFOW { + cb: mem::size_of::() as u32, + ..Default::default() + }, + lpAttributeList: attr_list, + }; + + let mut cmd_wide: Vec = cmd.encode_utf16().chain(std::iter::once(0)).collect(); + let mut proc_info = PROCESS_INFORMATION::default(); + + CreateProcessW( + None, + windows::core::PWSTR(cmd_wide.as_mut_ptr()), + None, + None, + false, + EXTENDED_STARTUPINFO_PRESENT, + None, + None, + &startup_info.StartupInfo, + &mut proc_info, + ) + .map_err(|e| format!("CreateProcessW: {e}"))?; + + DeleteProcThreadAttributeList(attr_list); + + // Wrap pipe handles as std::fs::File + let reader = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( + output_read.0 as RawHandle, + )); + let writer = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( + input_write.0 as RawHandle, + )); + + Ok(PtyChild { + reader, + writer, + handle: PtyHandle { + hpcon, + process_handle: proc_info.hProcess, + thread_handle: proc_info.hThread, + closed: false, + }, + }) + } + + pub fn resize(handle: &PtyHandle, cols: u16, rows: u16) -> Result<(), String> { + if handle.closed { + return Err("PTY already closed".into()); + } + let size = COORD { + X: cols as i16, + Y: rows as i16, + }; + unsafe { ResizePseudoConsole(handle.hpcon, size) } + .map_err(|e| format!("ResizePseudoConsole: {e}")) + } + + pub fn kill(handle: &mut PtyHandle) { + if handle.closed { + return; + } + unsafe { + let _ = TerminateProcess(handle.process_handle, 1); + // Close the ConPTY so the output pipe gets EOF + ClosePseudoConsole(handle.hpcon); + } + handle.closed = true; + } +} + +#[cfg(windows)] +impl Drop for PtyHandle { + fn drop(&mut self) { + unsafe { + if !self.closed { + windows::Win32::System::Console::ClosePseudoConsole(self.hpcon); + } + let _ = windows::Win32::Foundation::CloseHandle(self.process_handle); + let _ = windows::Win32::Foundation::CloseHandle(self.thread_handle); + } + } +} diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index 2898daae0..aa6796fe9 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -13,9 +13,10 @@ use crate::nu::{value_to_json, ReturnOptions}; use crate::store::{FollowOption, Frame, ReadOptions, Store}; use serde_json::json; +use super::pty::PtyChild; + #[derive(Clone, Debug, serde::Deserialize, Default)] pub struct PtyOptions { - pub cmd: String, pub cols: Option, pub rows: Option, } @@ -240,12 +241,7 @@ async fn run(store: Store, spawn_frame: Frame) { } }; - if let Some(pty_opts) = opts.pty { - run_pty_loop(store, loop_ctx, spawn_frame.id, pty_opts).await; - return; - } - - // Closure-based service: extract the run closure + // Extract the run field -- required for both closure and PTY services let run_val = match config_value.get_data_by_key("run") { Some(v) => v, None => { @@ -255,12 +251,35 @@ async fn run(store: Store, spawn_frame: Frame) { spawn_frame.id, None, ServiceEventKind::ParseError { - message: "Script must define a 'run' closure or 'pty' options.".into(), + message: "Script must define a 'run' field.".into(), }, ); return; } }; + + // PTY mode: run is a command string, pty provides dimensions + if let Some(pty_opts) = opts.pty { + let cmd = match run_val.as_str() { + Ok(s) => s.to_string(), + Err(_) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: "With pty enabled, 'run' must be a command string.".into(), + }, + ); + return; + } + }; + run_pty_loop(store, loop_ctx, spawn_frame.id, cmd, pty_opts).await; + return; + } + + // Closure-based service: run is a closure let run_closure = match run_val.as_closure() { Ok(c) => c.clone(), Err(e) => { @@ -489,80 +508,13 @@ async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) { } } -#[cfg(unix)] async fn run_pty_loop( store: Store, loop_ctx: ServiceLoop, source_id: Scru128Id, + initial_cmd: String, initial_pty_opts: PtyOptions, ) { - use nix::libc; - use nix::pty::openpty; - use nix::sys::signal::{kill, Signal}; - use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; - use nix::unistd::{close, dup2, execvp, fork, setsid, ForkResult, Pid}; - use std::ffi::CString; - use std::os::unix::io::{FromRawFd, IntoRawFd}; - - fn spawn_pty_child(opts: &PtyOptions) -> Result<(std::os::unix::io::RawFd, Pid), String> { - let cols = opts.cols.unwrap_or(80); - let rows = opts.rows.unwrap_or(24); - - let ws = libc::winsize { - ws_row: rows, - ws_col: cols, - ws_xpixel: 0, - ws_ypixel: 0, - }; - - let pty = openpty(Some(&ws), None).map_err(|e| format!("openpty: {e}"))?; - - // Convert OwnedFd to raw fds before fork. After fork both processes - // need to close fds manually, so raw fds are simpler and safer. - let master_fd = pty.master.into_raw_fd(); - let slave_fd = pty.slave.into_raw_fd(); - - match unsafe { fork() } { - Ok(ForkResult::Child) => { - let _ = close(master_fd); - let _ = setsid(); - - // Set controlling terminal - unsafe { - libc::ioctl(slave_fd, libc::TIOCSCTTY as _, 0); - } - - let _ = dup2(slave_fd, 0); - let _ = dup2(slave_fd, 1); - let _ = dup2(slave_fd, 2); - if slave_fd > 2 { - let _ = close(slave_fd); - } - - let cmd = - CString::new(opts.cmd.as_str()).unwrap_or_else(|_| CString::new("sh").unwrap()); - let args = [cmd.clone()]; - let _ = execvp(&cmd, &args); - std::process::exit(1); - } - Ok(ForkResult::Parent { child }) => { - let _ = close(slave_fd); - Ok((master_fd, child)) - } - Err(e) => Err(format!("fork: {e}")), - } - } - - fn kill_child(pid: Pid) { - let _ = kill(pid, Signal::SIGTERM); - // Give the child a moment to exit, then force kill - std::thread::sleep(std::time::Duration::from_millis(50)); - if let Ok(WaitStatus::StillAlive) = waitpid(pid, Some(WaitPidFlag::WNOHANG)) { - let _ = kill(pid, Signal::SIGKILL); - let _ = waitpid(pid, None); - } - } - let start_event = emit_event( &store, &loop_ctx, @@ -580,10 +532,18 @@ async fn run_pty_loop( let mut control_rx = store.read(control_rx_options).await; let mut current_id = source_id; + let mut cmd = initial_cmd; let mut pty_opts = initial_pty_opts; loop { - let (master_fd, child_pid) = match spawn_pty_child(&pty_opts) { + let cols = pty_opts.cols.unwrap_or(80); + let rows = pty_opts.rows.unwrap_or(24); + + let PtyChild { + reader, + writer, + mut handle, + } = match PtyChild::spawn(&cmd, cols, rows) { Ok(v) => v, Err(e) => { let _ = emit_event( @@ -604,18 +564,16 @@ async fn run_pty_loop( } }; - // Safety: we own master_fd after fork, the child closed its copy - let master_file = unsafe { std::fs::File::from_raw_fd(master_fd) }; - let master_read = tokio::fs::File::from_std(master_file.try_clone().unwrap()); - let master_write = tokio::fs::File::from_std(master_file); + let async_reader = tokio::fs::File::from_std(reader); + let async_writer = tokio::fs::File::from_std(writer); - // Spawn task: read master fd -> emit recv frames + // Read PTY output -> emit recv frames let recv_store = store.clone(); let recv_ctx = loop_ctx.clone(); let recv_id = current_id; let (child_done_tx, child_done_rx) = tokio::sync::oneshot::channel::<()>(); let recv_handle = tokio::spawn(async move { - let mut reader = master_read; + let mut reader = async_reader; let mut buf = [0u8; 8192]; loop { match reader.read(&mut buf).await { @@ -638,7 +596,7 @@ async fn run_pty_loop( let _ = child_done_tx.send(()); }); - // Spawn task: read send frames -> write to master fd + // Read send frames -> write to PTY input let send_store = store.clone(); let send_topic = format!("{}.send", loop_ctx.topic); let send_options = ReadOptions::builder() @@ -649,7 +607,7 @@ async fn run_pty_loop( use tokio::io::AsyncWriteExt; let send_handle = tokio::spawn(async move { - let mut writer = master_write; + let mut writer = async_writer; let mut rx = send_rx; while let Some(frame) = rx.recv().await { if frame.topic != send_topic { @@ -677,7 +635,7 @@ async fn run_pty_loop( ChildExited, Terminate, Shutdown, - Update(Scru128Id, PtyOptions), + Update(Scru128Id, String, PtyOptions), Error(String), } @@ -687,12 +645,12 @@ async fn run_pty_loop( maybe = control_rx.recv() => { match maybe { Some(frame) if frame.topic == terminate_topic => { - kill_child(child_pid); + handle.kill(); let _ = (&mut child_done_rx).await; break 'pty_ctrl PtyOutcome::Terminate; } Some(frame) if frame.topic == "xs.stopping" => { - kill_child(child_pid); + handle.kill(); let _ = (&mut child_done_rx).await; break 'pty_ctrl PtyOutcome::Shutdown; } @@ -721,14 +679,28 @@ async fn run_pty_loop( } }; if let Some(new_pty) = new_opts.pty { - kill_child(child_pid); + let new_cmd = match val.get_data_by_key("run").and_then(|v| v.as_str().ok().map(String::from)) { + Some(c) => c, + None => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { + message: "With pty enabled, 'run' must be a command string.".into(), + }, + ); + continue; + } + }; + handle.kill(); let _ = (&mut child_done_rx).await; - break 'pty_ctrl PtyOutcome::Update(frame.id, new_pty); + break 'pty_ctrl PtyOutcome::Update(frame.id, new_cmd, new_pty); } - // New config is not PTY -- treat as terminate - // so serve.rs can restart with the new config type. - // This is an edge case (switching PTY -> closure). - kill_child(child_pid); + // New config dropped pty -- terminate so + // serve.rs can restart with the new config. + handle.kill(); let _ = (&mut child_done_rx).await; break 'pty_ctrl PtyOutcome::Terminate; } @@ -750,16 +722,7 @@ async fn run_pty_loop( if let Some(ref meta) = frame.meta { let cols = meta.get("cols").and_then(|v| v.as_u64()).unwrap_or(80) as u16; let rows = meta.get("rows").and_then(|v| v.as_u64()).unwrap_or(24) as u16; - let ws = libc::winsize { - ws_row: rows, - ws_col: cols, - ws_xpixel: 0, - ws_ypixel: 0, - }; - unsafe { - libc::ioctl(master_fd, libc::TIOCSWINSZ as _, &ws); - } - let _ = kill(child_pid, Signal::SIGWINCH); + let _ = handle.resize(cols, rows); } } Some(_) => {} @@ -779,7 +742,7 @@ async fn run_pty_loop( PtyOutcome::ChildExited => StopReason::Finished, PtyOutcome::Terminate => StopReason::Terminate, PtyOutcome::Shutdown => StopReason::Shutdown, - PtyOutcome::Update(update_id, _) => StopReason::Update { + PtyOutcome::Update(update_id, _, _) => StopReason::Update { update_id: *update_id, }, PtyOutcome::Error(e) => StopReason::Error { message: e.clone() }, @@ -795,7 +758,6 @@ async fn run_pty_loop( match outcome { PtyOutcome::ChildExited => { - // Restart after delay, like closure services tokio::time::sleep(Duration::from_secs(1)).await; if let Ok(event) = emit_event( &store, @@ -807,8 +769,9 @@ async fn run_pty_loop( start_id = event.frame.id; } } - PtyOutcome::Update(new_id, new_pty) => { + PtyOutcome::Update(new_id, new_cmd, new_pty) => { current_id = new_id; + cmd = new_cmd; pty_opts = new_pty; if let Ok(event) = emit_event( &store, diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 82d8b3c76..d4c544bb4 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -824,3 +824,95 @@ async fn test_graceful_shutdown_via_xs_stopping() { "service handle should complete after xs.stopping" ); } + +#[tokio::test] +async fn test_pty_service() { + let store = setup_test_env(); + + { + let store = store.clone(); + tokio::spawn(async move { + crate::processor::service::run(store).await.unwrap(); + }); + } + + let options = ReadOptions::builder() + .follow(FollowOption::On) + .new(true) + .build(); + let mut recver = store.read(options).await; + + #[cfg(unix)] + let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; + #[cfg(windows)] + let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; + + let spawn = store + .append( + Frame::builder("ptytest.spawn") + .hash(store.cas_insert(script).await.unwrap()) + .build(), + ) + .unwrap(); + + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.spawn"); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.running"); + + // The shell should produce some initial output (prompt or banner) + let frame = recver.recv().await.unwrap(); + assert_eq!(frame.topic, "ptytest.recv"); + let meta = frame.meta.as_ref().unwrap(); + assert_eq!(meta["source_id"], spawn.id.to_string()); + let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); + assert!(!bytes.is_empty(), "PTY should produce output"); + + // Send input: echo a known string + #[cfg(unix)] + let input = "echo pty-ok\n"; + #[cfg(windows)] + let input = "echo pty-ok\r\n"; + + store + .append( + Frame::builder("ptytest.send") + .hash(store.cas_insert(input).await.unwrap()) + .build(), + ) + .unwrap(); + + // Read recv frames until we see "pty-ok" in the output + let deadline = Instant::now() + Duration::from_secs(5); + let mut found = false; + while Instant::now() < deadline { + let timeout = tokio::time::timeout(Duration::from_secs(2), recver.recv()).await; + match timeout { + Ok(Some(frame)) => { + if frame.topic == "ptytest.recv" { + if let Some(ref hash) = frame.hash { + let content = store.cas_read(hash).await.unwrap(); + if String::from_utf8_lossy(&content).contains("pty-ok") { + found = true; + break; + } + } + } + } + _ => break, + } + } + assert!(found, "expected to find 'pty-ok' in PTY output"); + + // Terminate + store + .append(Frame::builder("ptytest.terminate").build()) + .unwrap(); + + let stop = loop { + let frame = recver.recv().await.unwrap(); + if frame.topic == "ptytest.stopped" { + break frame; + } + }; + assert_eq!(stop.meta.unwrap()["reason"], "terminate"); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.shutdown"); +} From 801843efa5c6e7a2912f399409fb0cf8c5517dcc Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 16:12:31 +0000 Subject: [PATCH 04/19] ci: run on all pushes, not just main --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c4600799..4a05e2572 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,6 @@ name: CI on: push: - branches: [ "main" ] pull_request: branches: [ "main" ] From 3f4cd6e3b32ccd62270646214ad3a1404d871a0f Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 16:29:44 +0000 Subject: [PATCH 05/19] fix: windows PTY build errors (CreateProcessW arg, Send impl) --- src/processor/service/pty.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index 19858a5b3..8d6d78a12 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -139,6 +139,10 @@ pub struct PtyHandle { closed: bool, } +// HPCON and HANDLE are process-wide values safe to send between threads. +#[cfg(windows)] +unsafe impl Send for PtyHandle {} + #[cfg(windows)] mod platform { use super::*; @@ -220,7 +224,7 @@ mod platform { CreateProcessW( None, - windows::core::PWSTR(cmd_wide.as_mut_ptr()), + Some(windows::core::PWSTR(cmd_wide.as_mut_ptr())), None, None, false, From 2a1c90dbb27cbc7520ae8699834df8fda815d1b0 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 17:17:38 +0000 Subject: [PATCH 06/19] test: gate PTY test to unix only for now --- src/processor/service/tests.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index d4c544bb4..9cd56db96 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -825,6 +825,7 @@ async fn test_graceful_shutdown_via_xs_stopping() { ); } +#[cfg(unix)] #[tokio::test] async fn test_pty_service() { let store = setup_test_env(); @@ -842,10 +843,7 @@ async fn test_pty_service() { .build(); let mut recver = store.read(options).await; - #[cfg(unix)] let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; - #[cfg(windows)] - let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; let spawn = store .append( @@ -866,11 +864,7 @@ async fn test_pty_service() { let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); assert!(!bytes.is_empty(), "PTY should produce output"); - // Send input: echo a known string - #[cfg(unix)] let input = "echo pty-ok\n"; - #[cfg(windows)] - let input = "echo pty-ok\r\n"; store .append( From 2cd7367595c2327e376e30eb423ff65943211822 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 18:00:50 +0000 Subject: [PATCH 07/19] test: add timeouts to PTY test, restore Windows coverage --- src/processor/service/tests.rs | 44 ++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 9cd56db96..85244e5fd 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -825,7 +825,6 @@ async fn test_graceful_shutdown_via_xs_stopping() { ); } -#[cfg(unix)] #[tokio::test] async fn test_pty_service() { let store = setup_test_env(); @@ -843,7 +842,20 @@ async fn test_pty_service() { .build(); let mut recver = store.read(options).await; + // Helper: recv with a timeout so the test fails instead of hanging. + async fn recv_timeout( + rx: &mut tokio::sync::mpsc::Receiver, + ) -> Frame { + tokio::time::timeout(Duration::from_secs(10), rx.recv()) + .await + .expect("timed out waiting for frame") + .expect("channel closed") + } + + #[cfg(unix)] let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; + #[cfg(windows)] + let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; let spawn = store .append( @@ -853,18 +865,22 @@ async fn test_pty_service() { ) .unwrap(); - assert_eq!(recver.recv().await.unwrap().topic, "ptytest.spawn"); - assert_eq!(recver.recv().await.unwrap().topic, "ptytest.running"); + assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.spawn"); + assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.running"); // The shell should produce some initial output (prompt or banner) - let frame = recver.recv().await.unwrap(); + let frame = recv_timeout(&mut recver).await; assert_eq!(frame.topic, "ptytest.recv"); let meta = frame.meta.as_ref().unwrap(); assert_eq!(meta["source_id"], spawn.id.to_string()); let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); assert!(!bytes.is_empty(), "PTY should produce output"); + // Send input: echo a known string + #[cfg(unix)] let input = "echo pty-ok\n"; + #[cfg(windows)] + let input = "echo pty-ok\r\n"; store .append( @@ -875,10 +891,10 @@ async fn test_pty_service() { .unwrap(); // Read recv frames until we see "pty-ok" in the output - let deadline = Instant::now() + Duration::from_secs(5); + let deadline = Instant::now() + Duration::from_secs(10); let mut found = false; while Instant::now() < deadline { - let timeout = tokio::time::timeout(Duration::from_secs(2), recver.recv()).await; + let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; match timeout { Ok(Some(frame)) => { if frame.topic == "ptytest.recv" { @@ -901,12 +917,16 @@ async fn test_pty_service() { .append(Frame::builder("ptytest.terminate").build()) .unwrap(); - let stop = loop { - let frame = recver.recv().await.unwrap(); - if frame.topic == "ptytest.stopped" { - break frame; + let stop = tokio::time::timeout(Duration::from_secs(10), async { + loop { + let frame = recver.recv().await.unwrap(); + if frame.topic == "ptytest.stopped" { + break frame; + } } - }; + }) + .await + .expect("timed out waiting for ptytest.stopped"); assert_eq!(stop.meta.unwrap()["reason"], "terminate"); - assert_eq!(recver.recv().await.unwrap().topic, "ptytest.shutdown"); + assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.shutdown"); } From cd28d37505e130daa4c55977012409da2c4c4b99 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 18:16:39 +0000 Subject: [PATCH 08/19] test: add diagnostics to PTY test for Windows CI debugging --- src/processor/service/tests.rs | 52 +++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 85244e5fd..958100914 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -842,16 +842,6 @@ async fn test_pty_service() { .build(); let mut recver = store.read(options).await; - // Helper: recv with a timeout so the test fails instead of hanging. - async fn recv_timeout( - rx: &mut tokio::sync::mpsc::Receiver, - ) -> Frame { - tokio::time::timeout(Duration::from_secs(10), rx.recv()) - .await - .expect("timed out waiting for frame") - .expect("channel closed") - } - #[cfg(unix)] let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; #[cfg(windows)] @@ -865,11 +855,41 @@ async fn test_pty_service() { ) .unwrap(); - assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.spawn"); - assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.running"); + // Collect early frames with diagnostics so we can see what the service emits + let deadline = Instant::now() + Duration::from_secs(15); + let mut saw_spawn = false; + let mut saw_running = false; + while Instant::now() < deadline { + let frame = tokio::time::timeout(Duration::from_secs(10), recver.recv()) + .await + .expect("timed out waiting for frame") + .expect("channel closed"); + eprintln!("pty_test frame: {} meta={:?}", frame.topic, frame.meta); + if frame.topic == "ptytest.spawn" { + saw_spawn = true; + } + if frame.topic == "ptytest.running" { + saw_running = true; + break; + } + if frame.topic.starts_with("ptytest.stopped") + || frame.topic.starts_with("ptytest.parse") + || frame.topic.starts_with("ptytest.shutdown") + { + panic!( + "service failed before running: {} meta={:?}", + frame.topic, frame.meta + ); + } + } + assert!(saw_spawn, "never saw ptytest.spawn"); + assert!(saw_running, "never saw ptytest.running"); // The shell should produce some initial output (prompt or banner) - let frame = recv_timeout(&mut recver).await; + let frame = tokio::time::timeout(Duration::from_secs(10), recver.recv()) + .await + .expect("timed out waiting for initial recv") + .expect("channel closed"); assert_eq!(frame.topic, "ptytest.recv"); let meta = frame.meta.as_ref().unwrap(); assert_eq!(meta["source_id"], spawn.id.to_string()); @@ -928,5 +948,9 @@ async fn test_pty_service() { .await .expect("timed out waiting for ptytest.stopped"); assert_eq!(stop.meta.unwrap()["reason"], "terminate"); - assert_eq!(recv_timeout(&mut recver).await.topic, "ptytest.shutdown"); + let shutdown = tokio::time::timeout(Duration::from_secs(10), recver.recv()) + .await + .expect("timed out waiting for shutdown") + .expect("channel closed"); + assert_eq!(shutdown.topic, "ptytest.shutdown"); } From 693444577e840143780c45a6cd09969496972934 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 18:25:28 +0000 Subject: [PATCH 09/19] test: don't assume PTY produces unsolicited output on Windows --- src/processor/service/tests.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 958100914..a150bc195 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -885,18 +885,8 @@ async fn test_pty_service() { assert!(saw_spawn, "never saw ptytest.spawn"); assert!(saw_running, "never saw ptytest.running"); - // The shell should produce some initial output (prompt or banner) - let frame = tokio::time::timeout(Duration::from_secs(10), recver.recv()) - .await - .expect("timed out waiting for initial recv") - .expect("channel closed"); - assert_eq!(frame.topic, "ptytest.recv"); - let meta = frame.meta.as_ref().unwrap(); - assert_eq!(meta["source_id"], spawn.id.to_string()); - let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); - assert!(!bytes.is_empty(), "PTY should produce output"); - - // Send input: echo a known string + // Send input right away -- don't assume the shell produces unsolicited + // output (cmd.exe via ConPTY on CI may not emit a prompt). #[cfg(unix)] let input = "echo pty-ok\n"; #[cfg(windows)] @@ -911,15 +901,19 @@ async fn test_pty_service() { .unwrap(); // Read recv frames until we see "pty-ok" in the output - let deadline = Instant::now() + Duration::from_secs(10); + let deadline = Instant::now() + Duration::from_secs(15); let mut found = false; while Instant::now() < deadline { - let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; + let timeout = tokio::time::timeout(Duration::from_secs(10), recver.recv()).await; match timeout { Ok(Some(frame)) => { if frame.topic == "ptytest.recv" { if let Some(ref hash) = frame.hash { let content = store.cas_read(hash).await.unwrap(); + eprintln!( + "pty_test recv: {:?}", + String::from_utf8_lossy(&content) + ); if String::from_utf8_lossy(&content).contains("pty-ok") { found = true; break; From 7d7ab67e50c7f827b7625437546ad5229d2dc182 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 18:38:53 +0000 Subject: [PATCH 10/19] fix: use blocking threads for PTY pipe I/O tokio::fs::File does not work with anonymous pipe handles on Windows. Switch PTY reader and writer to std::thread with direct std::io for cross-platform pipe I/O. --- src/processor/service/service.rs | 33 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index aa6796fe9..10e4b34f4 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -564,19 +564,16 @@ async fn run_pty_loop( } }; - let async_reader = tokio::fs::File::from_std(reader); - let async_writer = tokio::fs::File::from_std(writer); - - // Read PTY output -> emit recv frames + // Read PTY output -> emit recv frames (blocking thread for pipe I/O) let recv_store = store.clone(); let recv_ctx = loop_ctx.clone(); let recv_id = current_id; let (child_done_tx, child_done_rx) = tokio::sync::oneshot::channel::<()>(); - let recv_handle = tokio::spawn(async move { - let mut reader = async_reader; + let recv_handle = std::thread::spawn(move || { + let mut reader = reader; let mut buf = [0u8; 8192]; loop { - match reader.read(&mut buf).await { + match reader.read(&mut buf) { Ok(0) => break, Ok(n) => { let _ = emit_event( @@ -596,7 +593,7 @@ async fn run_pty_loop( let _ = child_done_tx.send(()); }); - // Read send frames -> write to PTY input + // Write to PTY input (blocking thread for pipe I/O) let send_store = store.clone(); let send_topic = format!("{}.send", loop_ctx.topic); let send_options = ReadOptions::builder() @@ -605,20 +602,21 @@ async fn run_pty_loop( .build(); let send_rx = send_store.read(send_options).await; - use tokio::io::AsyncWriteExt; - let send_handle = tokio::spawn(async move { - let mut writer = async_writer; + use std::io::Write; + let rt = tokio::runtime::Handle::current(); + let _send_handle = std::thread::spawn(move || { + let mut writer = writer; let mut rx = send_rx; - while let Some(frame) = rx.recv().await { + while let Some(frame) = rt.block_on(rx.recv()) { if frame.topic != send_topic { continue; } if let Some(hash) = frame.hash { - if let Ok(bytes) = send_store.cas_read(&hash).await { - if writer.write_all(&bytes).await.is_err() { + if let Ok(bytes) = rt.block_on(send_store.cas_read(&hash)) { + if writer.write_all(&bytes).is_err() { break; } - if writer.flush().await.is_err() { + if writer.flush().is_err() { break; } } @@ -735,8 +733,9 @@ async fn run_pty_loop( } }; - recv_handle.abort(); - send_handle.abort(); + // The reader thread exits when the pipe gets EOF (after kill). + // The sender thread exits when the store channel closes. + let _ = recv_handle.join(); let stop_reason = match &outcome { PtyOutcome::ChildExited => StopReason::Finished, From c505709dc8665155080d0d330944b299a3073441 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 19:01:22 +0000 Subject: [PATCH 11/19] test: add raw PTY I/O test to isolate ConPTY issue on Windows --- src/processor/service/tests.rs | 73 ++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index a150bc195..3300e0967 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -948,3 +948,76 @@ async fn test_pty_service() { .expect("channel closed"); assert_eq!(shutdown.topic, "ptytest.shutdown"); } + +/// Direct test of PTY pipe I/O, bypassing the service machinery. +#[test] +fn test_pty_raw_io() { + use super::pty::PtyChild; + use std::io::{Read, Write}; + + #[cfg(unix)] + let cmd = "sh"; + #[cfg(windows)] + let cmd = "cmd.exe"; + + let PtyChild { + reader, + writer, + mut handle, + } = PtyChild::spawn(cmd, 80, 24).expect("PtyChild::spawn failed"); + + // Write input on a thread (pipe write can block) + let mut w = writer; + let write_thread = std::thread::spawn(move || { + #[cfg(unix)] + let input = b"echo pty-raw-ok\n"; + #[cfg(windows)] + let input = b"echo pty-raw-ok\r\n"; + w.write_all(input).expect("write_all failed"); + w.flush().expect("flush failed"); + }); + + // Read output on a thread with a join timeout + let mut r = reader; + let read_thread = std::thread::spawn(move || { + let mut all = Vec::new(); + let mut buf = [0u8; 4096]; + let deadline = Instant::now() + Duration::from_secs(10); + loop { + if Instant::now() > deadline { + break; + } + match r.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + all.extend_from_slice(&buf[..n]); + let s = String::from_utf8_lossy(&all); + eprintln!("pty_raw_io read ({n} bytes): {:?}", &s[s.len().saturating_sub(200)..]); + if s.contains("pty-raw-ok") { + return all; + } + } + Err(e) => { + eprintln!("pty_raw_io read error: {e}"); + break; + } + } + } + all + }); + + write_thread.join().expect("write thread panicked"); + let output = read_thread + .join() + .expect("read thread panicked"); + let output_str = String::from_utf8_lossy(&output); + eprintln!("pty_raw_io total output ({} bytes): {:?}", output.len(), output_str); + + handle.kill(); + + assert!( + output_str.contains("pty-raw-ok"), + "expected 'pty-raw-ok' in PTY output, got: {:?}", + output_str + ); +} From c9849e73b11c81d885adbb361fd6300573b066b5 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 19:34:01 +0000 Subject: [PATCH 12/19] test: skip PTY test on Windows CI (ConPTY pipes need a desktop session) --- src/processor/service/tests.rs | 155 +++++---------------------------- 1 file changed, 23 insertions(+), 132 deletions(-) diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 3300e0967..7bd1839db 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -825,6 +825,10 @@ async fn test_graceful_shutdown_via_xs_stopping() { ); } +// ConPTY on headless Windows CI does not flush output to pipes, so PTY +// integration tests only run on Unix. The Windows code is still compiled +// and checked via the build step. +#[cfg(unix)] #[tokio::test] async fn test_pty_service() { let store = setup_test_env(); @@ -842,10 +846,7 @@ async fn test_pty_service() { .build(); let mut recver = store.read(options).await; - #[cfg(unix)] let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; - #[cfg(windows)] - let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; let spawn = store .append( @@ -855,65 +856,36 @@ async fn test_pty_service() { ) .unwrap(); - // Collect early frames with diagnostics so we can see what the service emits - let deadline = Instant::now() + Duration::from_secs(15); - let mut saw_spawn = false; - let mut saw_running = false; - while Instant::now() < deadline { - let frame = tokio::time::timeout(Duration::from_secs(10), recver.recv()) - .await - .expect("timed out waiting for frame") - .expect("channel closed"); - eprintln!("pty_test frame: {} meta={:?}", frame.topic, frame.meta); - if frame.topic == "ptytest.spawn" { - saw_spawn = true; - } - if frame.topic == "ptytest.running" { - saw_running = true; - break; - } - if frame.topic.starts_with("ptytest.stopped") - || frame.topic.starts_with("ptytest.parse") - || frame.topic.starts_with("ptytest.shutdown") - { - panic!( - "service failed before running: {} meta={:?}", - frame.topic, frame.meta - ); - } - } - assert!(saw_spawn, "never saw ptytest.spawn"); - assert!(saw_running, "never saw ptytest.running"); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.spawn"); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.running"); - // Send input right away -- don't assume the shell produces unsolicited - // output (cmd.exe via ConPTY on CI may not emit a prompt). - #[cfg(unix)] - let input = "echo pty-ok\n"; - #[cfg(windows)] - let input = "echo pty-ok\r\n"; + // The shell should produce some initial output (prompt or banner) + let frame = recver.recv().await.unwrap(); + assert_eq!(frame.topic, "ptytest.recv"); + let meta = frame.meta.as_ref().unwrap(); + assert_eq!(meta["source_id"], spawn.id.to_string()); + let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); + assert!(!bytes.is_empty(), "PTY should produce output"); + // Send input store .append( Frame::builder("ptytest.send") - .hash(store.cas_insert(input).await.unwrap()) + .hash(store.cas_insert("echo pty-ok\n").await.unwrap()) .build(), ) .unwrap(); // Read recv frames until we see "pty-ok" in the output - let deadline = Instant::now() + Duration::from_secs(15); + let deadline = Instant::now() + Duration::from_secs(5); let mut found = false; while Instant::now() < deadline { - let timeout = tokio::time::timeout(Duration::from_secs(10), recver.recv()).await; + let timeout = tokio::time::timeout(Duration::from_secs(2), recver.recv()).await; match timeout { Ok(Some(frame)) => { if frame.topic == "ptytest.recv" { if let Some(ref hash) = frame.hash { let content = store.cas_read(hash).await.unwrap(); - eprintln!( - "pty_test recv: {:?}", - String::from_utf8_lossy(&content) - ); if String::from_utf8_lossy(&content).contains("pty-ok") { found = true; break; @@ -931,93 +903,12 @@ async fn test_pty_service() { .append(Frame::builder("ptytest.terminate").build()) .unwrap(); - let stop = tokio::time::timeout(Duration::from_secs(10), async { - loop { - let frame = recver.recv().await.unwrap(); - if frame.topic == "ptytest.stopped" { - break frame; - } + let stop = loop { + let frame = recver.recv().await.unwrap(); + if frame.topic == "ptytest.stopped" { + break frame; } - }) - .await - .expect("timed out waiting for ptytest.stopped"); + }; assert_eq!(stop.meta.unwrap()["reason"], "terminate"); - let shutdown = tokio::time::timeout(Duration::from_secs(10), recver.recv()) - .await - .expect("timed out waiting for shutdown") - .expect("channel closed"); - assert_eq!(shutdown.topic, "ptytest.shutdown"); -} - -/// Direct test of PTY pipe I/O, bypassing the service machinery. -#[test] -fn test_pty_raw_io() { - use super::pty::PtyChild; - use std::io::{Read, Write}; - - #[cfg(unix)] - let cmd = "sh"; - #[cfg(windows)] - let cmd = "cmd.exe"; - - let PtyChild { - reader, - writer, - mut handle, - } = PtyChild::spawn(cmd, 80, 24).expect("PtyChild::spawn failed"); - - // Write input on a thread (pipe write can block) - let mut w = writer; - let write_thread = std::thread::spawn(move || { - #[cfg(unix)] - let input = b"echo pty-raw-ok\n"; - #[cfg(windows)] - let input = b"echo pty-raw-ok\r\n"; - w.write_all(input).expect("write_all failed"); - w.flush().expect("flush failed"); - }); - - // Read output on a thread with a join timeout - let mut r = reader; - let read_thread = std::thread::spawn(move || { - let mut all = Vec::new(); - let mut buf = [0u8; 4096]; - let deadline = Instant::now() + Duration::from_secs(10); - loop { - if Instant::now() > deadline { - break; - } - match r.read(&mut buf) { - Ok(0) => break, - Ok(n) => { - all.extend_from_slice(&buf[..n]); - let s = String::from_utf8_lossy(&all); - eprintln!("pty_raw_io read ({n} bytes): {:?}", &s[s.len().saturating_sub(200)..]); - if s.contains("pty-raw-ok") { - return all; - } - } - Err(e) => { - eprintln!("pty_raw_io read error: {e}"); - break; - } - } - } - all - }); - - write_thread.join().expect("write thread panicked"); - let output = read_thread - .join() - .expect("read thread panicked"); - let output_str = String::from_utf8_lossy(&output); - eprintln!("pty_raw_io total output ({} bytes): {:?}", output.len(), output_str); - - handle.kill(); - - assert!( - output_str.contains("pty-raw-ok"), - "expected 'pty-raw-ok' in PTY output, got: {:?}", - output_str - ); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.shutdown"); } From aca7ca6cedabcd0cef2ebfc2b34717d015f996ce Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Mon, 23 Feb 2026 20:38:05 +0000 Subject: [PATCH 13/19] fix: use named pipes with overlapped I/O for ConPTY Anonymous pipes (CreatePipe) do not support overlapped I/O, which causes ConPTY to block forever on headless runners. Switch to named pipes where ConPTY gets FILE_FLAG_OVERLAPPED client handles while our read/write ends remain synchronous. --- Cargo.toml | 2 +- src/processor/service/pty.rs | 94 ++++++++++++++++++++++++++++++---- src/processor/service/tests.rs | 34 ++++++++---- 3 files changed, 108 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6498b55e2..393153714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ nix = { version = "0.29", default-features = false, features = ["poll", "fs", "p [target.'cfg(windows)'.dependencies] win_uds = { version = "=0.2.1", features = ["async"] } -windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } +windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } [dev-dependencies] assert_cmd = "2.0.14" diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index 8d6d78a12..066f8613e 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -149,11 +149,17 @@ mod platform { use std::ffi::c_void; use std::mem; use std::os::windows::io::{FromRawHandle, RawHandle}; + use std::sync::atomic::{AtomicU64, Ordering}; + use windows::core::PCWSTR; use windows::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}; + use windows::Win32::Storage::FileSystem::{ + CreateFileW, FILE_FLAGS_AND_ATTRIBUTES, FILE_FLAG_OVERLAPPED, FILE_SHARE_MODE, + OPEN_EXISTING, + }; use windows::Win32::System::Console::{ ClosePseudoConsole, CreatePseudoConsole, ResizePseudoConsole, COORD, HPCON, }; - use windows::Win32::System::Pipes::CreatePipe; + use windows::Win32::System::Pipes::{CreateNamedPipeW, PIPE_TYPE_BYTE}; use windows::Win32::System::Threading::{ CreateProcessW, DeleteProcThreadAttributeList, InitializeProcThreadAttributeList, TerminateProcess, UpdateProcThreadAttribute, EXTENDED_STARTUPINFO_PRESENT, @@ -161,21 +167,87 @@ mod platform { STARTUPINFOEXW, STARTUPINFOW, }; + static PIPE_SEQ: AtomicU64 = AtomicU64::new(0); + + // PIPE_ACCESS_INBOUND / OUTBOUND as FILE_FLAGS_AND_ATTRIBUTES for CreateNamedPipeW + const PIPE_INBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000001); + const PIPE_OUTBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000002); + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { unsafe { spawn_inner(cmd, cols, rows) } } unsafe fn spawn_inner(cmd: &str, cols: u16, rows: u16) -> Result { - // Create two pipe pairs: input and output - let mut input_read = INVALID_HANDLE_VALUE; - let mut input_write = INVALID_HANDLE_VALUE; - let mut output_read = INVALID_HANDLE_VALUE; - let mut output_write = INVALID_HANDLE_VALUE; + let seq = PIPE_SEQ.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + + // -- Output pipe: ConPTY writes terminal output, we read it -- + let out_name: Vec = format!("\\\\.\\pipe\\xs-pty-{pid}-{seq}-out") + .encode_utf16() + .chain(std::iter::once(0)) + .collect(); + + // Server end (our synchronous read handle) + let output_read = CreateNamedPipeW( + PCWSTR(out_name.as_ptr()), + PIPE_INBOUND, + PIPE_TYPE_BYTE, + 1, + 4096, + 4096, + 0, + None, + ); + if output_read == INVALID_HANDLE_VALUE { + return Err("CreateNamedPipeW (output): failed".into()); + } + + // Client end (ConPTY's overlapped write handle) + let output_write = CreateFileW( + PCWSTR(out_name.as_ptr()), + 0x40000000, // GENERIC_WRITE + FILE_SHARE_MODE(0), + None, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + None, + ) + .map_err(|e| format!("CreateFileW (output): {e}"))?; + + // -- Input pipe: we write keystrokes, ConPTY reads them -- + let in_name: Vec = format!("\\\\.\\pipe\\xs-pty-{pid}-{seq}-in") + .encode_utf16() + .chain(std::iter::once(0)) + .collect(); + + // Server end (our synchronous write handle) + let input_write = CreateNamedPipeW( + PCWSTR(in_name.as_ptr()), + PIPE_OUTBOUND, + PIPE_TYPE_BYTE, + 1, + 4096, + 4096, + 0, + None, + ); + if input_write == INVALID_HANDLE_VALUE { + let _ = CloseHandle(output_read); + let _ = CloseHandle(output_write); + return Err("CreateNamedPipeW (input): failed".into()); + } - CreatePipe(&mut input_read, &mut input_write, None, 0) - .map_err(|e| format!("CreatePipe (input): {e}"))?; - CreatePipe(&mut output_read, &mut output_write, None, 0) - .map_err(|e| format!("CreatePipe (output): {e}"))?; + // Client end (ConPTY's overlapped read handle) + let input_read = CreateFileW( + PCWSTR(in_name.as_ptr()), + 0x80000000, // GENERIC_READ + FILE_SHARE_MODE(0), + None, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + None, + ) + .map_err(|e| format!("CreateFileW (input): {e}"))?; let size = COORD { X: cols as i16, @@ -238,7 +310,7 @@ mod platform { DeleteProcThreadAttributeList(attr_list); - // Wrap pipe handles as std::fs::File + // Wrap our synchronous pipe handles as std::fs::File let reader = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( output_read.0 as RawHandle, )); diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 7bd1839db..a98ff7dfd 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -825,10 +825,6 @@ async fn test_graceful_shutdown_via_xs_stopping() { ); } -// ConPTY on headless Windows CI does not flush output to pipes, so PTY -// integration tests only run on Unix. The Windows code is still compiled -// and checked via the build step. -#[cfg(unix)] #[tokio::test] async fn test_pty_service() { let store = setup_test_env(); @@ -846,7 +842,10 @@ async fn test_pty_service() { .build(); let mut recver = store.read(options).await; + #[cfg(unix)] let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; + #[cfg(windows)] + let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; let spawn = store .append( @@ -860,7 +859,10 @@ async fn test_pty_service() { assert_eq!(recver.recv().await.unwrap().topic, "ptytest.running"); // The shell should produce some initial output (prompt or banner) - let frame = recver.recv().await.unwrap(); + let frame = tokio::time::timeout(Duration::from_secs(5), recver.recv()) + .await + .expect("timed out waiting for initial PTY output") + .unwrap(); assert_eq!(frame.topic, "ptytest.recv"); let meta = frame.meta.as_ref().unwrap(); assert_eq!(meta["source_id"], spawn.id.to_string()); @@ -868,19 +870,24 @@ async fn test_pty_service() { assert!(!bytes.is_empty(), "PTY should produce output"); // Send input + #[cfg(unix)] + let input = "echo pty-ok\n"; + #[cfg(windows)] + let input = "echo pty-ok\r\n"; + store .append( Frame::builder("ptytest.send") - .hash(store.cas_insert("echo pty-ok\n").await.unwrap()) + .hash(store.cas_insert(input).await.unwrap()) .build(), ) .unwrap(); // Read recv frames until we see "pty-ok" in the output - let deadline = Instant::now() + Duration::from_secs(5); + let deadline = Instant::now() + Duration::from_secs(10); let mut found = false; while Instant::now() < deadline { - let timeout = tokio::time::timeout(Duration::from_secs(2), recver.recv()).await; + let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; match timeout { Ok(Some(frame)) => { if frame.topic == "ptytest.recv" { @@ -904,11 +911,18 @@ async fn test_pty_service() { .unwrap(); let stop = loop { - let frame = recver.recv().await.unwrap(); + let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; + let frame = timeout + .expect("timed out waiting for ptytest.stopped") + .unwrap(); if frame.topic == "ptytest.stopped" { break frame; } }; assert_eq!(stop.meta.unwrap()["reason"], "terminate"); - assert_eq!(recver.recv().await.unwrap().topic, "ptytest.shutdown"); + let shutdown = tokio::time::timeout(Duration::from_secs(5), recver.recv()) + .await + .expect("timed out waiting for ptytest.shutdown") + .unwrap(); + assert_eq!(shutdown.topic, "ptytest.shutdown"); } From 2f2224adbbfe7aab7f377903bc74c9a11bb0d76d Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Tue, 24 Feb 2026 14:27:10 +0000 Subject: [PATCH 14/19] refactor: use random pipe names for ConPTY --- src/processor/service/pty.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index 066f8613e..171f1ceb9 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -149,7 +149,6 @@ mod platform { use std::ffi::c_void; use std::mem; use std::os::windows::io::{FromRawHandle, RawHandle}; - use std::sync::atomic::{AtomicU64, Ordering}; use windows::core::PCWSTR; use windows::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}; use windows::Win32::Storage::FileSystem::{ @@ -167,8 +166,6 @@ mod platform { STARTUPINFOEXW, STARTUPINFOW, }; - static PIPE_SEQ: AtomicU64 = AtomicU64::new(0); - // PIPE_ACCESS_INBOUND / OUTBOUND as FILE_FLAGS_AND_ATTRIBUTES for CreateNamedPipeW const PIPE_INBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000001); const PIPE_OUTBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000002); @@ -178,11 +175,10 @@ mod platform { } unsafe fn spawn_inner(cmd: &str, cols: u16, rows: u16) -> Result { - let seq = PIPE_SEQ.fetch_add(1, Ordering::Relaxed); - let pid = std::process::id(); + let id: u128 = rand::random(); // -- Output pipe: ConPTY writes terminal output, we read it -- - let out_name: Vec = format!("\\\\.\\pipe\\xs-pty-{pid}-{seq}-out") + let out_name: Vec = format!("\\\\.\\pipe\\xs-pty-{id:032x}-out") .encode_utf16() .chain(std::iter::once(0)) .collect(); @@ -215,7 +211,7 @@ mod platform { .map_err(|e| format!("CreateFileW (output): {e}"))?; // -- Input pipe: we write keystrokes, ConPTY reads them -- - let in_name: Vec = format!("\\\\.\\pipe\\xs-pty-{pid}-{seq}-in") + let in_name: Vec = format!("\\\\.\\pipe\\xs-pty-{id:032x}-in") .encode_utf16() .chain(std::iter::once(0)) .collect(); From 19267d7e8a05d9a639bf24231ace718f3739eab8 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Tue, 24 Feb 2026 14:40:26 +0000 Subject: [PATCH 15/19] fix: prevent parent's redirected stdout from bypassing ConPTY pipes On headless runners (CI, services), the parent process has redirected stdout/stderr. Windows kernel duplicates these to the child even with a pseudoconsole attached. Setting STARTF_USESTDHANDLES with null handles prevents this, forcing the child to use the ConPTY pipes. See: microsoft/terminal#11276 --- src/processor/service/pty.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index 171f1ceb9..4a67be181 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -163,7 +163,7 @@ mod platform { CreateProcessW, DeleteProcThreadAttributeList, InitializeProcThreadAttributeList, TerminateProcess, UpdateProcThreadAttribute, EXTENDED_STARTUPINFO_PRESENT, LPPROC_THREAD_ATTRIBUTE_LIST, PROCESS_INFORMATION, PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE, - STARTUPINFOEXW, STARTUPINFOW, + STARTF_USESTDHANDLES, STARTUPINFOEXW, STARTUPINFOW, }; // PIPE_ACCESS_INBOUND / OUTBOUND as FILE_FLAGS_AND_ATTRIBUTES for CreateNamedPipeW @@ -279,9 +279,13 @@ mod platform { ) .map_err(|e| format!("UpdateProcThreadAttribute: {e}"))?; + // STARTF_USESTDHANDLES with null handles prevents the kernel from + // duplicating the parent's redirected stdout/stderr to the child, + // which would bypass the pseudoconsole pipes entirely. let startup_info = STARTUPINFOEXW { StartupInfo: STARTUPINFOW { cb: mem::size_of::() as u32, + dwFlags: STARTF_USESTDHANDLES, ..Default::default() }, lpAttributeList: attr_list, From 317d2a73c2e6a5b89d202822a0d87584da1a764d Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Tue, 24 Feb 2026 15:10:53 +0000 Subject: [PATCH 16/19] refactor: simplify ConPTY to anonymous pipes with STARTF_USESTDHANDLES Diagnostic testing on Windows CI (cablehead/conpty-test) confirmed: - STARTF_USESTDHANDLES is required for headless ConPTY - Pipe type (anonymous vs named, overlapped vs sync) has no effect - Anonymous pipes are simpler and work identically Without the flag, the kernel duplicates the parent's redirected stdout/stderr to the child, bypassing the pseudoconsole pipes. --- Cargo.toml | 2 +- src/processor/service/pty.rs | 92 +++++------------------------------- 2 files changed, 12 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 393153714..6498b55e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ nix = { version = "0.29", default-features = false, features = ["poll", "fs", "p [target.'cfg(windows)'.dependencies] win_uds = { version = "=0.2.1", features = ["async"] } -windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } +windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } [dev-dependencies] assert_cmd = "2.0.14" diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index 4a67be181..bb5347b78 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -149,16 +149,11 @@ mod platform { use std::ffi::c_void; use std::mem; use std::os::windows::io::{FromRawHandle, RawHandle}; - use windows::core::PCWSTR; - use windows::Win32::Foundation::{CloseHandle, HANDLE, INVALID_HANDLE_VALUE}; - use windows::Win32::Storage::FileSystem::{ - CreateFileW, FILE_FLAGS_AND_ATTRIBUTES, FILE_FLAG_OVERLAPPED, FILE_SHARE_MODE, - OPEN_EXISTING, - }; + use windows::Win32::Foundation::{CloseHandle, HANDLE}; use windows::Win32::System::Console::{ ClosePseudoConsole, CreatePseudoConsole, ResizePseudoConsole, COORD, HPCON, }; - use windows::Win32::System::Pipes::{CreateNamedPipeW, PIPE_TYPE_BYTE}; + use windows::Win32::System::Pipes::CreatePipe; use windows::Win32::System::Threading::{ CreateProcessW, DeleteProcThreadAttributeList, InitializeProcThreadAttributeList, TerminateProcess, UpdateProcThreadAttribute, EXTENDED_STARTUPINFO_PRESENT, @@ -166,91 +161,27 @@ mod platform { STARTF_USESTDHANDLES, STARTUPINFOEXW, STARTUPINFOW, }; - // PIPE_ACCESS_INBOUND / OUTBOUND as FILE_FLAGS_AND_ATTRIBUTES for CreateNamedPipeW - const PIPE_INBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000001); - const PIPE_OUTBOUND: FILE_FLAGS_AND_ATTRIBUTES = FILE_FLAGS_AND_ATTRIBUTES(0x00000002); - pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { unsafe { spawn_inner(cmd, cols, rows) } } unsafe fn spawn_inner(cmd: &str, cols: u16, rows: u16) -> Result { - let id: u128 = rand::random(); - - // -- Output pipe: ConPTY writes terminal output, we read it -- - let out_name: Vec = format!("\\\\.\\pipe\\xs-pty-{id:032x}-out") - .encode_utf16() - .chain(std::iter::once(0)) - .collect(); - - // Server end (our synchronous read handle) - let output_read = CreateNamedPipeW( - PCWSTR(out_name.as_ptr()), - PIPE_INBOUND, - PIPE_TYPE_BYTE, - 1, - 4096, - 4096, - 0, - None, - ); - if output_read == INVALID_HANDLE_VALUE { - return Err("CreateNamedPipeW (output): failed".into()); - } + // Create anonymous pipes for ConPTY I/O + let mut input_read = HANDLE::default(); + let mut input_write = HANDLE::default(); + CreatePipe(&mut input_read, &mut input_write, None, 0) + .map_err(|e| format!("CreatePipe (input): {e}"))?; - // Client end (ConPTY's overlapped write handle) - let output_write = CreateFileW( - PCWSTR(out_name.as_ptr()), - 0x40000000, // GENERIC_WRITE - FILE_SHARE_MODE(0), - None, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - None, - ) - .map_err(|e| format!("CreateFileW (output): {e}"))?; - - // -- Input pipe: we write keystrokes, ConPTY reads them -- - let in_name: Vec = format!("\\\\.\\pipe\\xs-pty-{id:032x}-in") - .encode_utf16() - .chain(std::iter::once(0)) - .collect(); - - // Server end (our synchronous write handle) - let input_write = CreateNamedPipeW( - PCWSTR(in_name.as_ptr()), - PIPE_OUTBOUND, - PIPE_TYPE_BYTE, - 1, - 4096, - 4096, - 0, - None, - ); - if input_write == INVALID_HANDLE_VALUE { - let _ = CloseHandle(output_read); - let _ = CloseHandle(output_write); - return Err("CreateNamedPipeW (input): failed".into()); - } - - // Client end (ConPTY's overlapped read handle) - let input_read = CreateFileW( - PCWSTR(in_name.as_ptr()), - 0x80000000, // GENERIC_READ - FILE_SHARE_MODE(0), - None, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - None, - ) - .map_err(|e| format!("CreateFileW (input): {e}"))?; + let mut output_read = HANDLE::default(); + let mut output_write = HANDLE::default(); + CreatePipe(&mut output_read, &mut output_write, None, 0) + .map_err(|e| format!("CreatePipe (output): {e}"))?; let size = COORD { X: cols as i16, Y: rows as i16, }; - // ConPTY reads from input_read, writes to output_write let hpcon = CreatePseudoConsole(size, input_read, output_write, 0) .map_err(|e| format!("CreatePseudoConsole: {e}"))?; @@ -310,7 +241,6 @@ mod platform { DeleteProcThreadAttributeList(attr_list); - // Wrap our synchronous pipe handles as std::fs::File let reader = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( output_read.0 as RawHandle, )); From 7fbf50c1aac7587e9ca3ba7680a7b6e2251a0f84 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Tue, 24 Feb 2026 21:49:59 +0000 Subject: [PATCH 17/19] fix: pass HPCON value directly to UpdateProcThreadAttribute &hpcon creates double indirection (pointer to pointer), causing ConPTY to not properly attach to the child process. The HPCON handle is itself a pointer and must be passed directly as lpValue. --- src/processor/service/pty.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index bb5347b78..b48d8991c 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -203,7 +203,7 @@ mod platform { attr_list, 0, PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE as usize, - Some(&hpcon as *const HPCON as *const c_void), + Some(hpcon.0 as *const c_void), mem::size_of::(), None, None, From 01b07eab886579048a7e6728e76dea486c58da8f Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Wed, 25 Feb 2026 02:34:49 +0000 Subject: [PATCH 18/19] fix: reap zombie children and join writer thread on PTY restart --- src/processor/service/pty.rs | 9 ++++++++ src/processor/service/service.rs | 38 +++++++++++++++++++++----------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs index b48d8991c..5be8e7a55 100644 --- a/src/processor/service/pty.rs +++ b/src/processor/service/pty.rs @@ -129,6 +129,15 @@ mod platform { } } +#[cfg(unix)] +impl Drop for PtyHandle { + fn drop(&mut self) { + // Reap the child to avoid zombies. Safe to call after kill() already + // reaped -- waitpid returns ECHILD which we ignore. + let _ = nix::sys::wait::waitpid(self.pid, Some(nix::sys::wait::WaitPidFlag::WNOHANG)); + } +} + // ---- Windows ---- #[cfg(windows)] diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index 10e4b34f4..ce71496d0 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -601,25 +601,35 @@ async fn run_pty_loop( .after(start_id) .build(); let send_rx = send_store.read(send_options).await; + let (send_stop_tx, mut send_stop_rx) = tokio::sync::oneshot::channel::<()>(); use std::io::Write; let rt = tokio::runtime::Handle::current(); - let _send_handle = std::thread::spawn(move || { + let send_handle = std::thread::spawn(move || { let mut writer = writer; let mut rx = send_rx; - while let Some(frame) = rt.block_on(rx.recv()) { - if frame.topic != send_topic { - continue; - } - if let Some(hash) = frame.hash { - if let Ok(bytes) = rt.block_on(send_store.cas_read(&hash)) { - if writer.write_all(&bytes).is_err() { - break; - } - if writer.flush().is_err() { - break; + loop { + let frame = rt.block_on(async { + tokio::select! { + f = rx.recv() => f, + _ = &mut send_stop_rx => None, + } + }); + match frame { + Some(frame) if frame.topic == send_topic => { + if let Some(hash) = frame.hash { + if let Ok(bytes) = rt.block_on(send_store.cas_read(&hash)) { + if writer.write_all(&bytes).is_err() { + break; + } + if writer.flush().is_err() { + break; + } + } } } + Some(_) => continue, + None => break, } } }); @@ -734,8 +744,10 @@ async fn run_pty_loop( }; // The reader thread exits when the pipe gets EOF (after kill). - // The sender thread exits when the store channel closes. let _ = recv_handle.join(); + // Signal the writer thread to stop and wait for it. + drop(send_stop_tx); + let _ = send_handle.join(); let stop_reason = match &outcome { PtyOutcome::ChildExited => StopReason::Finished, From 5f46bd124c45012839c49f0b9861710eb16920cb Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Wed, 25 Feb 2026 02:36:51 +0000 Subject: [PATCH 19/19] fix: revert CI to main-only pushes, update docs spawn syntax --- .github/workflows/ci.yml | 1 + docs/src/content/docs/tutorials/web-terminal.mdx | 9 +++------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a05e2572..1c4600799 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,6 +2,7 @@ name: CI on: push: + branches: [ "main" ] pull_request: branches: [ "main" ] diff --git a/docs/src/content/docs/tutorials/web-terminal.mdx b/docs/src/content/docs/tutorials/web-terminal.mdx index 3af123cdf..4aa34110a 100644 --- a/docs/src/content/docs/tutorials/web-terminal.mdx +++ b/docs/src/content/docs/tutorials/web-terminal.mdx @@ -28,7 +28,7 @@ graph LR end Browser -- "POST /pty/create
{cols, rows}" --> HttpNu - HttpNu -- ".append pty.sid.spawn
pty: {cmd: nu}" --> XS + HttpNu -- ".append pty.sid.spawn
{ run: nu, pty: {cols, rows} }" --> XS XS -- "spawn service" --> PTY PTY -- "exec" --> Nu @@ -71,10 +71,7 @@ A normal xs service uses `run` with a closure. For PTY mode, `run` is a command string and `pty` provides the terminal dimensions: ```nushell -r#'{ - run: "/usr/local/bin/nu" - pty: {cols: 131, rows: 50} -}'# | .append my-shell.spawn +{ run: "nu", pty: {cols: 80, rows: 24} } | .append my-shell.spawn ``` When xs sees a `pty` field in the spawn config, it: @@ -128,7 +125,7 @@ use http-nu/router * let sid = (random uuid) # Spawn the PTY service - $"{ run: \"/usr/local/bin/nu\", pty: {cols: ($cols), rows: ($rows)} }" + { run: "nu", pty: {cols: $cols, rows: $rows} } | .append $"pty.($sid).spawn" {sid: $sid} | to json