diff --git a/Cargo.lock b/Cargo.lock index d187de97e..fabdb5ca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1093,6 +1093,7 @@ dependencies = [ "url", "webpki-roots 0.26.11", "win_uds", + "windows 0.62.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 802117f88..6498b55e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,10 +79,11 @@ rand = "0.8" data-encoding = "2.6" [target.'cfg(unix)'.dependencies] -nix = { version = "0.29", default-features = false, features = ["poll", "fs"] } +nix = { version = "0.29", default-features = false, features = ["poll", "fs", "process", "signal", "term"] } [target.'cfg(windows)'.dependencies] win_uds = { version = "=0.2.1", features = ["async"] } +windows = { version = "0.62", features = ["Win32_Foundation", "Win32_Security", "Win32_System_Console", "Win32_System_Pipes", "Win32_System_Threading"] } [dev-dependencies] assert_cmd = "2.0.14" diff --git a/docs/src/content/docs/tutorials/web-terminal.mdx b/docs/src/content/docs/tutorials/web-terminal.mdx new file mode 100644 index 000000000..4aa34110a --- /dev/null +++ b/docs/src/content/docs/tutorials/web-terminal.mdx @@ -0,0 +1,282 @@ +--- +title: Web Terminal +description: Connect ghostty-web to an xs-managed PTY session over HTTP, using http-nu as the bridge. +sidebar: + order: 5 +--- + +import { Aside } from '@astrojs/starlight/components'; +import { Link } from '../../../utils/links'; + +This tutorial wires together three pieces to put a real shell in the browser: + +- **xs** allocates and manages a pseudo-terminal via its PTY service +- **http-nu** exposes the PTY over four HTTP endpoints +- **ghostty-web** renders the terminal in a canvas element using Ghostty's VT100 engine compiled to WASM + +Here is the full picture: + +```mermaid +graph LR + Browser["browser
ghostty-web
(WASM + Canvas)"] + + subgraph server + HttpNu["http-nu"] + XS["xs store"] + PTY["PTY service
(fork + openpty)"] + Nu["nu"] + end + + Browser -- "POST /pty/create
{cols, rows}" --> HttpNu + HttpNu -- ".append pty.sid.spawn
{ run: nu, pty: {cols, rows} }" --> XS + XS -- "spawn service" --> PTY + PTY -- "exec" --> Nu + + Nu -- "stdout" --> PTY + PTY -- ".recv frames" --> XS + XS -- ".cat --follow" --> HttpNu + HttpNu -- "GET /pty/stream
SSE base64" --> Browser + + Browser -- "POST /pty/input
raw text" --> HttpNu + HttpNu -- ".append pty.sid.send" --> XS + XS -- "send frames" --> PTY + PTY -- "stdin" --> Nu + + Browser -- "POST /pty/resize
{cols, rows}" --> HttpNu + HttpNu -- ".append pty.sid.resize" --> XS + XS -- "ioctl TIOCSWINSZ
+ SIGWINCH" --> PTY +``` + +The rest of this tutorial builds each layer from the inside out: PTY, then +HTTP, then browser. + +## Prerequisites + +- xs installed and on your PATH (see Installation) +- http-nu installed with `--store` support +- ghostty-web built (`bun run build`) with dist assets available +- A terminal running with `use xs.nu *` + +## Start a store + +```bash withOutput +xs serve ./store +``` + +Leave this running. + +## The PTY service + +A normal xs service uses `run` with a closure. For PTY mode, `run` is a command +string and `pty` provides the terminal dimensions: + +```nushell +{ run: "nu", pty: {cols: 80, rows: 24} } | .append my-shell.spawn +``` + +When xs sees a `pty` field in the spawn config, it: + +1. Allocates a pseudo-terminal with the given size (openpty on Unix, ConPTY on Windows) +2. Spawns the `run` command inside the PTY +3. Reads the primary fd and emits raw bytes as `.recv` frames (content stored in CAS) +4. Watches for `.send` frames and writes their CAS content to the primary fd +5. Watches for `.resize` frames and applies `cols`/`rows` from frame metadata + +PTY mode is inherently bidirectional. No `duplex` flag needed. + +```mermaid +flowchart LR + subgraph xs + primary[primary fd] + recv[".recv frames"] + send[".send frames"] + resize[".resize frames"] + end + subgraph PTY + shell[nu] + secondary[secondary fd] + end + primary -- "read" --> recv + send -- "write" --> primary + resize -- "resize" --> primary + primary <--> secondary + secondary <--> shell +``` + +The full lifecycle (`running`, `stopped`, `shutdown`, auto-restart, hot reload, +terminate) works the same as closure-based services. + +## The http-nu handler + +The handler script bridges four HTTP endpoints to the PTY service topics. +Save this as `serve.nu`: + +```nushell +use http-nu/router * + +{|req| + dispatch $req [ + + # Create a new PTY session + (route {method: "POST", path: "/pty/create"} {|req ctx| + let body = ($in | from json) + let cols = ($body.cols? | default 80) + let rows = ($body.rows? | default 24) + let sid = (random uuid) + + # Spawn the PTY service + { run: "nu", pty: {cols: $cols, rows: $rows} } + | .append $"pty.($sid).spawn" + + {sid: $sid} | to json + }) + + # Stream PTY output as SSE + (route {method: "GET", path: "/pty/stream"} {|req ctx| + let sid = $req.query.sid + .head $"pty.($sid).recv" --follow + | each {|frame| + let bytes = (.cas $frame.hash) + {data: ($bytes | encode base64)} + } + | to sse + }) + + # Send keyboard input to the PTY + (route {method: "POST", path: "/pty/input"} {|req ctx| + let sid = $req.query.sid + $in | .append $"pty.($sid).send" + null | metadata set --merge {'http.response': {status: 204}} + }) + + # Resize the PTY + (route {method: "POST", path: "/pty/resize"} {|req ctx| + let body = ($in | from json) + let sid = $req.query.sid + .append $"pty.($sid).resize" --meta {cols: $body.cols, rows: $body.rows} + null | metadata set --merge {'http.response': {status: 204}} + }) + ] +} +``` + +Each endpoint maps to a PTY service topic: + +| Endpoint | Store topic | Direction | +| --- | --- | --- | +| `POST /pty/create` | `pty..spawn` | client -> xs | +| `GET /pty/stream` | `pty..recv` | xs -> client (SSE) | +| `POST /pty/input` | `pty..send` | client -> xs | +| `POST /pty/resize` | `pty..resize` | client -> xs | + +The stream endpoint uses `.head --follow` to tail new recv frames as +they arrive, base64-encodes the CAS content, and pipes through `to sse`. The +client receives a standard `text/event-stream` where each `data:` line carries +base64-encoded terminal output. + +## The client + +ghostty-web compiles Ghostty's VT100 parser and renderer to a 416KB WASM binary. +TypeScript handles canvas rendering, keyboard input, and clipboard. The terminal +itself is transport-agnostic: `term.write()` pushes bytes in, `term.onData()` +captures keystrokes out. + +The demo client (`sse-client.html`) connects the four endpoints: + +```javascript +// 1. Create session +const { sid } = await fetch('/pty/create', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ cols: term.cols, rows: term.rows }), +}).then(r => r.json()); + +// 2. Stream output via SSE +const source = new EventSource('/pty/stream?sid=' + sid); +source.onmessage = (e) => { + const bytes = Uint8Array.from(atob(e.data), c => c.charCodeAt(0)); + term.write(bytes); +}; + +// 3. Send keystrokes via POST +term.onData((data) => { + fetch('/pty/input?sid=' + sid, { method: 'POST', body: data }); +}); + +// 4. Send resize via POST +term.onResize(({ cols, rows }) => { + fetch('/pty/resize?sid=' + sid, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ cols, rows }), + }); +}); +``` + +SSE downstream, POST upstream. No WebSocket upgrade required. Works through +HTTP proxies and tunnels. + +## Try it + +Start everything in three terminals. + +Terminal 1 -- the store (already running from above): + +```bash withOutput +xs serve ./store +``` + +Terminal 2 -- http-nu, pointing at the store and serving ghostty-web assets: + +```bash withOutput +http-nu :3001 --store ./store ./serve.nu +``` + +Terminal 3 -- open the browser: + +```bash withOutput +open http://localhost:3001 +``` + + + +You should see a nushell prompt rendered in the browser. Type commands, watch +output stream back. Every keystroke flows through xs as a `pty..send` +frame; every chunk of terminal output flows back as a `pty..recv` frame. + +## Resize + +Resize the browser window. ghostty-web's `FitAddon` recalculates cols and rows, +fires `term.onResize`, and the client POSTs to `/pty/resize`. http-nu appends a +`pty..resize` frame with `{cols, rows}` in metadata. xs applies the new +dimensions via `ioctl TIOCSWINSZ` and sends `SIGWINCH` to the child process. +The shell redraws to fit. + +## Terminate + +End a PTY session from the store side: + +```nushell +.append pty..terminate +``` + +xs kills the child process (SIGTERM, then SIGKILL if needed), emits +`pty..stopped` with `meta.reason` set to `terminate`, then +`pty..shutdown`. The SSE stream ends and ghostty-web shows a session-ended +message. + +## Recap + +| Component | Role | +| --- | --- | +| xs | Allocates the PTY, manages the child process, streams I/O as frames | +| http-nu | Bridges HTTP endpoints to store topics | +| ghostty-web | Renders VT100 output in a canvas, captures keyboard input | + +The PTY service handles the low-level terminal plumbing. http-nu translates +HTTP to frames. ghostty-web handles rendering. Each piece does one thing. diff --git a/src/nu/config.rs b/src/nu/config.rs index 1740f7f9b..240053473 100644 --- a/src/nu/config.rs +++ b/src/nu/config.rs @@ -46,12 +46,13 @@ pub struct ReturnOptions { pub target: Option, } -/// Parse a script into a NuScriptConfig struct. +/// Evaluate a nushell script and return the resulting Value. /// -/// Parses and evaluates the script, then extracts the `run` closure and the full -/// configuration value. VFS modules (registered via `*.nu` topics) are already -/// available on the engine state before this function is called. -pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result { +/// Handles parsing, error reporting, and evaluation. Does not extract any +/// specific fields from the result. Useful when the caller needs to inspect +/// the config value before deciding how to use it (e.g. checking for a `pty` +/// field vs a `run` closure). +pub fn eval_script(engine: &mut crate::nu::Engine, script: &str) -> Result { let mut working_set = StateWorkingSet::new(&engine.state); let block = parse(&mut working_set, None, script.as_bytes(), false); @@ -107,6 +108,17 @@ pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result Result { + let config_value = eval_script(engine, script)?; let run_val = config_value .get_data_by_key("run") @@ -115,8 +127,6 @@ pub fn parse_config(engine: &mut crate::nu::Engine, script: &str) -> Result Error { format!("'run' field must be a closure: {e}").into() })?; - engine.state.merge_env(&mut stack)?; - Ok(NuScriptConfig { run_closure: run_closure.clone(), full_config_value: config_value, diff --git a/src/nu/mod.rs b/src/nu/mod.rs index 505c002d8..fae28a07b 100644 --- a/src/nu/mod.rs +++ b/src/nu/mod.rs @@ -4,7 +4,7 @@ pub mod vfs; pub mod commands; pub mod util; -pub use config::{parse_config, NuScriptConfig, ReturnOptions}; +pub use config::{eval_script, parse_config, NuScriptConfig, ReturnOptions}; pub use engine::{add_core_commands, Engine}; pub use util::{frame_to_pipeline, frame_to_value, value_to_json}; pub use vfs::load_modules; diff --git a/src/processor/service/mod.rs b/src/processor/service/mod.rs index e68cb2182..fcf2406af 100644 --- a/src/processor/service/mod.rs +++ b/src/processor/service/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod pty; mod serve; #[allow(clippy::module_inception)] pub(crate) mod service; diff --git a/src/processor/service/pty.rs b/src/processor/service/pty.rs new file mode 100644 index 000000000..5be8e7a55 --- /dev/null +++ b/src/processor/service/pty.rs @@ -0,0 +1,308 @@ +//! Cross-platform pseudo-terminal child process. +//! +//! Unix: openpty + fork. Windows: ConPTY. + +/// A child process running inside a pseudo-terminal. +/// +/// `reader` carries terminal output (VT sequences). +/// `writer` accepts input (keystrokes). +/// `handle` controls the PTY lifecycle (resize, kill). +pub struct PtyChild { + pub reader: std::fs::File, + pub writer: std::fs::File, + pub handle: PtyHandle, +} + +impl PtyChild { + /// Spawn `cmd` inside a new PTY with the given dimensions. + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + platform::spawn(cmd, cols, rows) + } +} + +impl PtyHandle { + /// Resize the PTY. + pub fn resize(&self, cols: u16, rows: u16) -> Result<(), String> { + platform::resize(self, cols, rows) + } + + /// Kill the child process. + pub fn kill(&mut self) { + platform::kill(self); + } +} + +// ---- Unix ---- + +#[cfg(unix)] +pub struct PtyHandle { + /// Raw fd for ioctl. Valid as long as reader/writer are alive. + primary_fd: std::os::unix::io::RawFd, + pid: nix::unistd::Pid, +} + +#[cfg(unix)] +mod platform { + use super::*; + use nix::libc; + use nix::pty::openpty; + use nix::sys::signal::Signal; + use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; + use nix::unistd::{close, dup2, execvp, fork, setsid, ForkResult}; + use std::ffi::CString; + use std::os::unix::io::{FromRawFd, IntoRawFd}; + + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + + let pty = openpty(Some(&ws), None).map_err(|e| format!("openpty: {e}"))?; + let primary_fd = pty.master.into_raw_fd(); + let secondary_fd = pty.slave.into_raw_fd(); + + match unsafe { fork() } { + Ok(ForkResult::Child) => { + let _ = close(primary_fd); + let _ = setsid(); + + // Set controlling terminal + unsafe { + libc::ioctl(secondary_fd, libc::TIOCSCTTY as _, 0); + } + + let _ = dup2(secondary_fd, 0); + let _ = dup2(secondary_fd, 1); + let _ = dup2(secondary_fd, 2); + if secondary_fd > 2 { + let _ = close(secondary_fd); + } + + let c_cmd = CString::new(cmd).unwrap_or_else(|_| CString::new("sh").unwrap()); + let args = [c_cmd.clone()]; + let _ = execvp(&c_cmd, &args); + std::process::exit(1); + } + Ok(ForkResult::Parent { child }) => { + let _ = close(secondary_fd); + let primary_file = unsafe { std::fs::File::from_raw_fd(primary_fd) }; + let reader = primary_file + .try_clone() + .map_err(|e| format!("clone fd: {e}"))?; + Ok(PtyChild { + reader, + writer: primary_file, + handle: PtyHandle { + primary_fd, + pid: child, + }, + }) + } + Err(e) => Err(format!("fork: {e}")), + } + } + + pub fn resize(handle: &PtyHandle, cols: u16, rows: u16) -> Result<(), String> { + let ws = libc::winsize { + ws_row: rows, + ws_col: cols, + ws_xpixel: 0, + ws_ypixel: 0, + }; + unsafe { + libc::ioctl(handle.primary_fd, libc::TIOCSWINSZ as _, &ws); + } + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGWINCH); + Ok(()) + } + + pub fn kill(handle: &mut PtyHandle) { + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGTERM); + std::thread::sleep(std::time::Duration::from_millis(50)); + if let Ok(WaitStatus::StillAlive) = waitpid(handle.pid, Some(WaitPidFlag::WNOHANG)) { + let _ = nix::sys::signal::kill(handle.pid, Signal::SIGKILL); + let _ = waitpid(handle.pid, None); + } + } +} + +#[cfg(unix)] +impl Drop for PtyHandle { + fn drop(&mut self) { + // Reap the child to avoid zombies. Safe to call after kill() already + // reaped -- waitpid returns ECHILD which we ignore. + let _ = nix::sys::wait::waitpid(self.pid, Some(nix::sys::wait::WaitPidFlag::WNOHANG)); + } +} + +// ---- Windows ---- + +#[cfg(windows)] +pub struct PtyHandle { + hpcon: windows::Win32::System::Console::HPCON, + process_handle: windows::Win32::Foundation::HANDLE, + thread_handle: windows::Win32::Foundation::HANDLE, + closed: bool, +} + +// HPCON and HANDLE are process-wide values safe to send between threads. +#[cfg(windows)] +unsafe impl Send for PtyHandle {} + +#[cfg(windows)] +mod platform { + use super::*; + use std::ffi::c_void; + use std::mem; + use std::os::windows::io::{FromRawHandle, RawHandle}; + use windows::Win32::Foundation::{CloseHandle, HANDLE}; + use windows::Win32::System::Console::{ + ClosePseudoConsole, CreatePseudoConsole, ResizePseudoConsole, COORD, HPCON, + }; + use windows::Win32::System::Pipes::CreatePipe; + use windows::Win32::System::Threading::{ + CreateProcessW, DeleteProcThreadAttributeList, InitializeProcThreadAttributeList, + TerminateProcess, UpdateProcThreadAttribute, EXTENDED_STARTUPINFO_PRESENT, + LPPROC_THREAD_ATTRIBUTE_LIST, PROCESS_INFORMATION, PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE, + STARTF_USESTDHANDLES, STARTUPINFOEXW, STARTUPINFOW, + }; + + pub fn spawn(cmd: &str, cols: u16, rows: u16) -> Result { + unsafe { spawn_inner(cmd, cols, rows) } + } + + unsafe fn spawn_inner(cmd: &str, cols: u16, rows: u16) -> Result { + // Create anonymous pipes for ConPTY I/O + let mut input_read = HANDLE::default(); + let mut input_write = HANDLE::default(); + CreatePipe(&mut input_read, &mut input_write, None, 0) + .map_err(|e| format!("CreatePipe (input): {e}"))?; + + let mut output_read = HANDLE::default(); + let mut output_write = HANDLE::default(); + CreatePipe(&mut output_read, &mut output_write, None, 0) + .map_err(|e| format!("CreatePipe (output): {e}"))?; + + let size = COORD { + X: cols as i16, + Y: rows as i16, + }; + + let hpcon = CreatePseudoConsole(size, input_read, output_write, 0) + .map_err(|e| format!("CreatePseudoConsole: {e}"))?; + + // Close the pipe ends now owned by the ConPTY + let _ = CloseHandle(input_read); + let _ = CloseHandle(output_write); + + // Set up process thread attribute list + let mut attr_size: usize = 0; + let _ = InitializeProcThreadAttributeList(None, 1, None, &mut attr_size); + + let mut attr_buf = vec![0u8; attr_size]; + let attr_list = LPPROC_THREAD_ATTRIBUTE_LIST(attr_buf.as_mut_ptr() as *mut _); + + InitializeProcThreadAttributeList(Some(attr_list), 1, None, &mut attr_size) + .map_err(|e| format!("InitializeProcThreadAttributeList: {e}"))?; + + UpdateProcThreadAttribute( + attr_list, + 0, + PROC_THREAD_ATTRIBUTE_PSEUDOCONSOLE as usize, + Some(hpcon.0 as *const c_void), + mem::size_of::(), + None, + None, + ) + .map_err(|e| format!("UpdateProcThreadAttribute: {e}"))?; + + // STARTF_USESTDHANDLES with null handles prevents the kernel from + // duplicating the parent's redirected stdout/stderr to the child, + // which would bypass the pseudoconsole pipes entirely. + let startup_info = STARTUPINFOEXW { + StartupInfo: STARTUPINFOW { + cb: mem::size_of::() as u32, + dwFlags: STARTF_USESTDHANDLES, + ..Default::default() + }, + lpAttributeList: attr_list, + }; + + let mut cmd_wide: Vec = cmd.encode_utf16().chain(std::iter::once(0)).collect(); + let mut proc_info = PROCESS_INFORMATION::default(); + + CreateProcessW( + None, + Some(windows::core::PWSTR(cmd_wide.as_mut_ptr())), + None, + None, + false, + EXTENDED_STARTUPINFO_PRESENT, + None, + None, + &startup_info.StartupInfo, + &mut proc_info, + ) + .map_err(|e| format!("CreateProcessW: {e}"))?; + + DeleteProcThreadAttributeList(attr_list); + + let reader = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( + output_read.0 as RawHandle, + )); + let writer = std::fs::File::from(std::os::windows::io::OwnedHandle::from_raw_handle( + input_write.0 as RawHandle, + )); + + Ok(PtyChild { + reader, + writer, + handle: PtyHandle { + hpcon, + process_handle: proc_info.hProcess, + thread_handle: proc_info.hThread, + closed: false, + }, + }) + } + + pub fn resize(handle: &PtyHandle, cols: u16, rows: u16) -> Result<(), String> { + if handle.closed { + return Err("PTY already closed".into()); + } + let size = COORD { + X: cols as i16, + Y: rows as i16, + }; + unsafe { ResizePseudoConsole(handle.hpcon, size) } + .map_err(|e| format!("ResizePseudoConsole: {e}")) + } + + pub fn kill(handle: &mut PtyHandle) { + if handle.closed { + return; + } + unsafe { + let _ = TerminateProcess(handle.process_handle, 1); + // Close the ConPTY so the output pipe gets EOF + ClosePseudoConsole(handle.hpcon); + } + handle.closed = true; + } +} + +#[cfg(windows)] +impl Drop for PtyHandle { + fn drop(&mut self) { + unsafe { + if !self.closed { + windows::Win32::System::Console::ClosePseudoConsole(self.hpcon); + } + let _ = windows::Win32::Foundation::CloseHandle(self.process_handle); + let _ = windows::Win32::Foundation::CloseHandle(self.thread_handle); + } + } +} diff --git a/src/processor/service/service.rs b/src/processor/service/service.rs index 436a05252..ce71496d0 100644 --- a/src/processor/service/service.rs +++ b/src/processor/service/service.rs @@ -13,10 +13,19 @@ use crate::nu::{value_to_json, ReturnOptions}; use crate::store::{FollowOption, Frame, ReadOptions, Store}; use serde_json::json; +use super::pty::PtyChild; + +#[derive(Clone, Debug, serde::Deserialize, Default)] +pub struct PtyOptions { + pub cols: Option, + pub rows: Option, +} + #[derive(Clone, Debug, serde::Deserialize, Default)] pub struct ServiceScriptOptions { pub duplex: Option, pub return_options: Option, + pub pty: Option, } #[derive(Clone)] @@ -166,35 +175,113 @@ pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> { tokio::spawn(async move { run(store, spawn_frame).await }) } +async fn read_spawn_script(store: &Store, spawn_frame: &Frame) -> Option { + let hash = spawn_frame.hash.clone()?; + let mut reader = store.cas_reader(hash).await.ok()?; + let mut script = String::new(); + reader.read_to_string(&mut script).await.ok()?; + Some(script) +} + +fn make_loop_ctx(spawn_frame: &Frame) -> ServiceLoop { + ServiceLoop { + topic: spawn_frame + .topic + .strip_suffix(".spawn") + .unwrap_or(&spawn_frame.topic) + .to_string(), + } +} + async fn run(store: Store, spawn_frame: Frame) { + let script = match read_spawn_script(&store, &spawn_frame).await { + Some(s) => s, + None => return, + }; + + let loop_ctx = make_loop_ctx(&spawn_frame); + + // Evaluate the script to get the config value. Use eval_script so we can + // inspect the value before deciding whether this is a PTY or closure service. let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) { Ok(e) => e, Err(_) => return, }; - let hash = match spawn_frame.hash.clone() { - Some(h) => h, - None => return, + let config_value = match nu::eval_script(&mut engine, &script) { + Ok(v) => v, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: e.to_string(), + }, + ); + return; + } }; - let mut reader = match store.cas_reader(hash).await { - Ok(r) => r, - Err(_) => return, + + let opts: ServiceScriptOptions = match serde_json::from_value(nu::value_to_json(&config_value)) + { + Ok(o) => o, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: e.to_string(), + }, + ); + return; + } }; - let mut script = String::new(); - if reader.read_to_string(&mut script).await.is_err() { - return; - } - let loop_ctx = ServiceLoop { - topic: spawn_frame - .topic - .strip_suffix(".spawn") - .unwrap_or(&spawn_frame.topic) - .to_string(), + // Extract the run field -- required for both closure and PTY services + let run_val = match config_value.get_data_by_key("run") { + Some(v) => v, + None => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: "Script must define a 'run' field.".into(), + }, + ); + return; + } }; - let nu_config = match nu::parse_config(&mut engine, &script) { - Ok(cfg) => cfg, + // PTY mode: run is a command string, pty provides dimensions + if let Some(pty_opts) = opts.pty { + let cmd = match run_val.as_str() { + Ok(s) => s.to_string(), + Err(_) => { + let _ = emit_event( + &store, + &loop_ctx, + spawn_frame.id, + None, + ServiceEventKind::ParseError { + message: "With pty enabled, 'run' must be a command string.".into(), + }, + ); + return; + } + }; + run_pty_loop(store, loop_ctx, spawn_frame.id, cmd, pty_opts).await; + return; + } + + // Closure-based service: run is a closure + let run_closure = match run_val.as_closure() { + Ok(c) => c.clone(), Err(e) => { let _ = emit_event( &store, @@ -202,13 +289,12 @@ async fn run(store: Store, spawn_frame: Frame) { spawn_frame.id, None, ServiceEventKind::ParseError { - message: e.to_string(), + message: format!("'run' field must be a closure: {e}"), }, ); return; } }; - let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default(); // Create and set the interrupt signal on the engine state let interrupt = Arc::new(AtomicBool::new(false)); @@ -216,7 +302,7 @@ async fn run(store: Store, spawn_frame: Frame) { let task = Task { id: spawn_frame.id, - run_closure: nu_config.run_closure, + run_closure, return_options: opts.return_options, duplex: opts.duplex.unwrap_or(false), engine, @@ -422,6 +508,306 @@ async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) { } } +async fn run_pty_loop( + store: Store, + loop_ctx: ServiceLoop, + source_id: Scru128Id, + initial_cmd: String, + initial_pty_opts: PtyOptions, +) { + let start_event = emit_event( + &store, + &loop_ctx, + source_id, + None, + ServiceEventKind::Running, + ) + .expect("failed to emit running event"); + let mut start_id = start_event.frame.id; + + let control_rx_options = ReadOptions::builder() + .follow(FollowOption::On) + .after(start_id) + .build(); + let mut control_rx = store.read(control_rx_options).await; + + let mut current_id = source_id; + let mut cmd = initial_cmd; + let mut pty_opts = initial_pty_opts; + + loop { + let cols = pty_opts.cols.unwrap_or(80); + let rows = pty_opts.rows.unwrap_or(24); + + let PtyChild { + reader, + writer, + mut handle, + } = match PtyChild::spawn(&cmd, cols, rows) { + Ok(v) => v, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Stopped(StopReason::Error { message: e }), + ); + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Shutdown, + ); + return; + } + }; + + // Read PTY output -> emit recv frames (blocking thread for pipe I/O) + let recv_store = store.clone(); + let recv_ctx = loop_ctx.clone(); + let recv_id = current_id; + let (child_done_tx, child_done_rx) = tokio::sync::oneshot::channel::<()>(); + let recv_handle = std::thread::spawn(move || { + let mut reader = reader; + let mut buf = [0u8; 8192]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + let _ = emit_event( + &recv_store, + &recv_ctx, + recv_id, + None, + ServiceEventKind::Recv { + suffix: "recv".into(), + data: buf[..n].to_vec(), + }, + ); + } + Err(_) => break, + } + } + let _ = child_done_tx.send(()); + }); + + // Write to PTY input (blocking thread for pipe I/O) + let send_store = store.clone(); + let send_topic = format!("{}.send", loop_ctx.topic); + let send_options = ReadOptions::builder() + .follow(FollowOption::On) + .after(start_id) + .build(); + let send_rx = send_store.read(send_options).await; + let (send_stop_tx, mut send_stop_rx) = tokio::sync::oneshot::channel::<()>(); + + use std::io::Write; + let rt = tokio::runtime::Handle::current(); + let send_handle = std::thread::spawn(move || { + let mut writer = writer; + let mut rx = send_rx; + loop { + let frame = rt.block_on(async { + tokio::select! { + f = rx.recv() => f, + _ = &mut send_stop_rx => None, + } + }); + match frame { + Some(frame) if frame.topic == send_topic => { + if let Some(hash) = frame.hash { + if let Ok(bytes) = rt.block_on(send_store.cas_read(&hash)) { + if writer.write_all(&bytes).is_err() { + break; + } + if writer.flush().is_err() { + break; + } + } + } + } + Some(_) => continue, + None => break, + } + } + }); + + let terminate_topic = format!("{}.terminate", loop_ctx.topic); + let spawn_topic = format!("{}.spawn", loop_ctx.topic); + let resize_topic = format!("{}.resize", loop_ctx.topic); + tokio::pin!(child_done_rx); + + enum PtyOutcome { + ChildExited, + Terminate, + Shutdown, + Update(Scru128Id, String, PtyOptions), + Error(String), + } + + let outcome = 'pty_ctrl: loop { + tokio::select! { + biased; + maybe = control_rx.recv() => { + match maybe { + Some(frame) if frame.topic == terminate_topic => { + handle.kill(); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Terminate; + } + Some(frame) if frame.topic == "xs.stopping" => { + handle.kill(); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Shutdown; + } + Some(frame) if frame.topic == spawn_topic => { + if let Some(hash) = frame.hash.clone() { + if let Ok(bytes) = store.cas_read(&hash).await { + if let Ok(script) = String::from_utf8(bytes) { + let mut new_engine = match crate::processor::build_engine(&store, &frame.id) { + Ok(e) => e, + Err(_) => continue, + }; + match nu::eval_script(&mut new_engine, &script) { + Ok(val) => { + let new_opts: ServiceScriptOptions = + match serde_json::from_value(nu::value_to_json(&val)) { + Ok(o) => o, + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { message: e.to_string() }, + ); + continue; + } + }; + if let Some(new_pty) = new_opts.pty { + let new_cmd = match val.get_data_by_key("run").and_then(|v| v.as_str().ok().map(String::from)) { + Some(c) => c, + None => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { + message: "With pty enabled, 'run' must be a command string.".into(), + }, + ); + continue; + } + }; + handle.kill(); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Update(frame.id, new_cmd, new_pty); + } + // New config dropped pty -- terminate so + // serve.rs can restart with the new config. + handle.kill(); + let _ = (&mut child_done_rx).await; + break 'pty_ctrl PtyOutcome::Terminate; + } + Err(e) => { + let _ = emit_event( + &store, + &loop_ctx, + frame.id, + None, + ServiceEventKind::ParseError { message: e.to_string() }, + ); + } + } + } + } + } + } + Some(frame) if frame.topic == resize_topic => { + if let Some(ref meta) = frame.meta { + let cols = meta.get("cols").and_then(|v| v.as_u64()).unwrap_or(80) as u16; + let rows = meta.get("rows").and_then(|v| v.as_u64()).unwrap_or(24) as u16; + let _ = handle.resize(cols, rows); + } + } + Some(_) => {} + None => break 'pty_ctrl PtyOutcome::Error("control channel closed".into()), + } + } + _ = &mut child_done_rx => { + break 'pty_ctrl PtyOutcome::ChildExited; + } + } + }; + + // The reader thread exits when the pipe gets EOF (after kill). + let _ = recv_handle.join(); + // Signal the writer thread to stop and wait for it. + drop(send_stop_tx); + let _ = send_handle.join(); + + let stop_reason = match &outcome { + PtyOutcome::ChildExited => StopReason::Finished, + PtyOutcome::Terminate => StopReason::Terminate, + PtyOutcome::Shutdown => StopReason::Shutdown, + PtyOutcome::Update(update_id, _, _) => StopReason::Update { + update_id: *update_id, + }, + PtyOutcome::Error(e) => StopReason::Error { message: e.clone() }, + }; + + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Stopped(stop_reason), + ); + + match outcome { + PtyOutcome::ChildExited => { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Ok(event) = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Running, + ) { + start_id = event.frame.id; + } + } + PtyOutcome::Update(new_id, new_cmd, new_pty) => { + current_id = new_id; + cmd = new_cmd; + pty_opts = new_pty; + if let Ok(event) = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Running, + ) { + start_id = event.frame.id; + } + } + PtyOutcome::Terminate | PtyOutcome::Shutdown | PtyOutcome::Error(_) => { + let _ = emit_event( + &store, + &loop_ctx, + current_id, + None, + ServiceEventKind::Shutdown, + ); + break; + } + } + } +} + async fn build_input_pipeline( store: Store, loop_ctx: &ServiceLoop, diff --git a/src/processor/service/tests.rs b/src/processor/service/tests.rs index 82d8b3c76..a98ff7dfd 100644 --- a/src/processor/service/tests.rs +++ b/src/processor/service/tests.rs @@ -824,3 +824,105 @@ async fn test_graceful_shutdown_via_xs_stopping() { "service handle should complete after xs.stopping" ); } + +#[tokio::test] +async fn test_pty_service() { + let store = setup_test_env(); + + { + let store = store.clone(); + tokio::spawn(async move { + crate::processor::service::run(store).await.unwrap(); + }); + } + + let options = ReadOptions::builder() + .follow(FollowOption::On) + .new(true) + .build(); + let mut recver = store.read(options).await; + + #[cfg(unix)] + let script = r#"{ run: "sh", pty: {cols: 80, rows: 24} }"#; + #[cfg(windows)] + let script = r#"{ run: "cmd.exe", pty: {cols: 80, rows: 24} }"#; + + let spawn = store + .append( + Frame::builder("ptytest.spawn") + .hash(store.cas_insert(script).await.unwrap()) + .build(), + ) + .unwrap(); + + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.spawn"); + assert_eq!(recver.recv().await.unwrap().topic, "ptytest.running"); + + // The shell should produce some initial output (prompt or banner) + let frame = tokio::time::timeout(Duration::from_secs(5), recver.recv()) + .await + .expect("timed out waiting for initial PTY output") + .unwrap(); + assert_eq!(frame.topic, "ptytest.recv"); + let meta = frame.meta.as_ref().unwrap(); + assert_eq!(meta["source_id"], spawn.id.to_string()); + let bytes = store.cas_read(&frame.hash.unwrap()).await.unwrap(); + assert!(!bytes.is_empty(), "PTY should produce output"); + + // Send input + #[cfg(unix)] + let input = "echo pty-ok\n"; + #[cfg(windows)] + let input = "echo pty-ok\r\n"; + + store + .append( + Frame::builder("ptytest.send") + .hash(store.cas_insert(input).await.unwrap()) + .build(), + ) + .unwrap(); + + // Read recv frames until we see "pty-ok" in the output + let deadline = Instant::now() + Duration::from_secs(10); + let mut found = false; + while Instant::now() < deadline { + let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; + match timeout { + Ok(Some(frame)) => { + if frame.topic == "ptytest.recv" { + if let Some(ref hash) = frame.hash { + let content = store.cas_read(hash).await.unwrap(); + if String::from_utf8_lossy(&content).contains("pty-ok") { + found = true; + break; + } + } + } + } + _ => break, + } + } + assert!(found, "expected to find 'pty-ok' in PTY output"); + + // Terminate + store + .append(Frame::builder("ptytest.terminate").build()) + .unwrap(); + + let stop = loop { + let timeout = tokio::time::timeout(Duration::from_secs(5), recver.recv()).await; + let frame = timeout + .expect("timed out waiting for ptytest.stopped") + .unwrap(); + if frame.topic == "ptytest.stopped" { + break frame; + } + }; + assert_eq!(stop.meta.unwrap()["reason"], "terminate"); + let shutdown = tokio::time::timeout(Duration::from_secs(5), recver.recv()) + .await + .expect("timed out waiting for ptytest.shutdown") + .unwrap(); + assert_eq!(shutdown.topic, "ptytest.shutdown"); +}