diff --git a/.coderabbit.yaml b/.coderabbit.yaml new file mode 100644 index 0000000..ed71dc9 --- /dev/null +++ b/.coderabbit.yaml @@ -0,0 +1,162 @@ +# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json + +# mcp-ssh — CodeRabbit configuration +# +# Single Rust binary: an MCP server giving an AI agent remote shell + file +# access to ONE host, over authenticated MCP Streamable HTTP at /mcp. +# Authority: CLAUDE.md (Conventions + NEVER) is absolute. Cite the rule. + +language: en-US +early_access: true +enable_free_tier: true + +tone_instructions: "mcp-ssh: Rust 2024, tokio, axum, rmcp. CLAUDE.md NEVER rules are absolute — no password/token in logs/errors/responses, never run as root, never weaken auth, never add a 4th MCP tool (parametrize bash/job/file). Concrete diffs, skip nits." + +reviews: + profile: assertive + request_changes_workflow: true + high_level_summary: true + high_level_summary_placeholder: "@coderabbitai summary" + auto_title_placeholder: "@coderabbitai" + poem: false + review_status: true + collapse_walkthrough: false + sequence_diagrams: true + estimate_code_review_effort: true + suggested_reviewers: true + auto_assign_reviewers: false + abort_on_close: true + + auto_review: + enabled: true + auto_incremental_review: true + ignore_title_keywords: + - "WIP" + - "DRAFT" + - "DO NOT REVIEW" + drafts: false + base_branches: + - main + + tools: + shellcheck: { enabled: true } + markdownlint: { enabled: true } + yamllint: { enabled: true } + actionlint: { enabled: true } + hadolint: { enabled: true } + gitleaks: { enabled: true } + github-checks: { enabled: true, timeout_ms: 900000 } + ruff: { enabled: false } + biome: { enabled: false } + eslint: { enabled: false } + rubocop: { enabled: false } + + path_instructions: + - path: "**/*" + instructions: | + Review only changed files and code directly affected. Behaviour + changes must update docs/ in the same PR. Flag missing tests for new + public behaviour. + + # Rust source — CLAUDE.md Conventions + - path: "src/**/*.rs" + instructions: | + Enforce CLAUDE.md Conventions: + - No unwrap/expect outside main.rs and #[cfg(test)]; panic in the + request path crashes every client. + - Errors typed with thiserror; anyhow only at the main.rs boundary. + - Async end-to-end. No std::sync::Mutex on the request path (use + tokio::sync), never held across .await. No block_on; spawn_blocking + for blocking I/O. + - Newtype over bare primitives with meaning (JobId, not String). + Make illegal states unrepresentable. + - tracing spans (request_id, tool), never println!. + - Files <=300 LOC, one responsibility. clippy -D warnings + rustfmt + clean is the floor. + + # Auth + OAuth — security review required + - path: "src/auth.rs" + instructions: | + SECURITY REVIEW. HTTP Basic middleware. Never log/return the + password. Flag any path that bypasses or weakens auth. + - path: "src/oauth/**" + instructions: | + SECURITY REVIEW. OAuth 2.1: discovery, dynamic registration, PKCE on + authorize+token, bearer validation. No token material in logs/errors. + + # Config — fail fast, secrets + - path: "src/config.rs" + instructions: | + Validated at boot — missing auth creds fail at startup, never at + request time. MCP_SSH_ALLOWED_HOSTS must be required. Secrets never + logged. + + # Tools — constant 3-tool surface + - path: "src/tools/**" + instructions: | + Exactly 3 tools (bash/job/file) dispatching on action. NEW capability + = a new param/action, never a 4th tool. Thin adapters — no business + logic in the dispatch arm. + + # Jobs — backgrounding, log pagination, process-group kill + - path: "src/jobs/**" + instructions: | + Output streams to a per-job log file; poll paginates (cursor/limit) + so chatty commands never flood agent context. Jobs spawn in their own + process group so kills reach descendants. Kill escalates TERM->KILL. + Reaper drops jobs >24h, killing any still-Running group first. + + # Integration tests — real server over HTTP + - path: "tests/**/*.rs" + instructions: | + Boot the server and issue real MCP requests over HTTP. New behaviour + ships with the test that proves it. + + # CI + - path: ".github/workflows/**" + instructions: | + Runner: blacksmith-2vcpu-ubuntu-2404. Pin actions to major versions. + Required jobs: fmt, clippy (-D warnings), test. + + # Dockerfile + - path: "Dockerfile" + instructions: | + Multi-stage build. Non-root user — never run as root. No build + secrets baked in. + + - path: "**/*.md" + instructions: | + Lead with the rule, tables over prose, file paths over descriptions, + no filler. Valid links, accurate code examples. + + path_filters: + - "!target/**" + - "!**/target/**" + - "!**/*.log" + - "!.env*" + - "!**/.DS_Store" + +chat: + auto_reply: true + art: false + +knowledge_base: + opt_out: false + + web_search: + enabled: true + + code_guidelines: + enabled: true + filePatterns: + - "**/CLAUDE.md" + - "docs/**" + + learnings: + scope: auto + + issues: + scope: auto + + pull_requests: + scope: auto diff --git a/CLAUDE.md b/CLAUDE.md index 678b16f..9d5d205 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,7 +53,8 @@ Keep this accurate — it's the navigation aid. | `src/config.rs` | env + TOML file config; fails fast if auth creds missing | | `src/auth.rs` | HTTP Basic auth middleware | | `src/oauth/` | minimal OAuth 2.1 server: discovery metadata, dynamic client registration, authorize + token with PKCE, bearer validation | -| `src/jobs.rs` | job engine: run a command, return inline if fast (<2s) else a job id (or immediately when `bg`); output streams to a per-job log file, polled paginated; hourly reaper drops jobs >24h old | +| `src/jobs/mod.rs` | job engine: run a command, return inline if fast (<2s) else a job id (or immediately when `bg`); output streams to a per-job log file, polled paginated | +| `src/jobs/reaper.rs` | hourly reaper drops jobs >24h old (killing any still-`Running` group first); process-group kill helpers (TERM→KILL escalation), shared with `job(action="kill")` | | `src/tools/mod.rs` | MCP tool surface (`#[tool_router]`/`#[tool]` from rmcp): 3 tools (`bash`/`job`/`file`) dispatching on `action`. Thin adapters over jobs + files | | `src/tools/files.rs` | file operations (`tokio::fs`; `ls`/`find`/`grep` shelled out) | @@ -125,7 +126,8 @@ Non-negotiable: SOLID, SRP, tested code. The bar: idiomatic, boring, readable Ru | Config / env / required creds | `src/config.rs` | | HTTP Basic auth | `src/auth.rs` | | OAuth 2.1 (discovery, registration, PKCE, bearer) | `src/oauth/` | -| Running commands, backgrounding, job logs | `src/jobs.rs` | +| Running commands, backgrounding, job logs | `src/jobs/mod.rs` | +| Reaper eviction + process-group kill helpers | `src/jobs/reaper.rs` | | Tool definitions / MCP surface | `src/tools/mod.rs` | | File operations | `src/tools/files.rs` | diff --git a/src/jobs.rs b/src/jobs.rs deleted file mode 100644 index 5976b36..0000000 --- a/src/jobs.rs +++ /dev/null @@ -1,327 +0,0 @@ -//! Job engine: run a command, return inline if it's fast, otherwise hand back -//! a job id the caller polls. Output streams to a per-job log file so polling -//! can paginate it without holding everything in memory. -use std::{ - collections::HashMap, - path::PathBuf, - process::Stdio, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - time::Duration, -}; - -use tokio::sync::{Mutex, watch}; - -#[derive(Debug, Clone, serde::Serialize)] -#[serde(tag = "status", rename_all = "snake_case")] -pub enum JobState { - Running, - Exited { code: i32 }, - Failed { error: String }, -} - -struct Job { - cmd: String, - log_path: PathBuf, - pid: Option, - state: Arc>, - started: tokio::time::Instant, -} - -/// Result of starting a command. -pub enum RunResult { - /// Finished within the inline window — output is ready now. - Inline { state: JobState, page: Page }, - /// Still running — poll this id. - Backgrounded { id: String }, -} - -/// One page of log lines plus a cursor to fetch the next page. -#[derive(Debug, serde::Serialize)] -pub struct Page { - pub lines: Vec, - pub next_cursor: usize, - pub total_lines: usize, - pub has_more: bool, -} - -#[derive(Debug, serde::Serialize)] -pub struct JobSummary { - pub id: String, - pub cmd: String, - pub state: JobState, -} - -#[derive(Clone)] -pub struct JobStore { - dir: PathBuf, - inline_timeout: Duration, - seq: Arc, - jobs: Arc>>>, -} - -const DEFAULT_PAGE: usize = 200; -/// Jobs (and their logs) older than this are reaped hourly. -const RETENTION: Duration = Duration::from_secs(24 * 3600); - -impl JobStore { - pub fn new(dir: PathBuf, inline_timeout: Duration) -> std::io::Result { - std::fs::create_dir_all(&dir)?; - let jobs = Arc::new(Mutex::new(HashMap::new())); - spawn_reaper(jobs.clone()); - Ok(Self { - dir, - inline_timeout, - seq: Arc::new(AtomicU64::new(1)), - jobs, - }) - } - - /// Spawn `cmd`. With `background`, return a job id immediately; otherwise wait - /// up to the inline window and return output if it finishes in time. - pub async fn run( - &self, - cmd: String, - cwd: Option, - timeout_secs: Option, - background: bool, - ) -> std::io::Result { - let id = format!("j{}", self.seq.fetch_add(1, Ordering::Relaxed)); - let log_path = self.dir.join(format!("{id}.log")); - - // ponytail: stdout+stderr merged into one log (terminal-style). Split into - // two files if a caller ever needs them apart. - let out = std::fs::File::create(&log_path)?; - let err = out.try_clone()?; - - let mut command = tokio::process::Command::new("sh"); - command - .arg("-c") - .arg(&cmd) - .stdin(Stdio::null()) - .stdout(Stdio::from(out)) - .stderr(Stdio::from(err)); - if let Some(dir) = cwd { - command.current_dir(dir); - } - - let mut child = command.spawn()?; - let pid = child.id(); - let (tx, rx) = watch::channel(false); - let state = Arc::new(Mutex::new(JobState::Running)); - - // Waiter owns the child so it can reap it; updates shared state on exit. - { - let state = state.clone(); - tokio::spawn(async move { - let result = match child.wait().await { - Ok(s) => JobState::Exited { - code: s.code().unwrap_or(-1), - }, - Err(e) => JobState::Failed { - error: e.to_string(), - }, - }; - *state.lock().await = result; - let _ = tx.send(true); - }); - } - - let job = Arc::new(Job { - cmd, - log_path: log_path.clone(), - pid, - state: state.clone(), - started: tokio::time::Instant::now(), - }); - self.jobs.lock().await.insert(id.clone(), job); - - // `bg: true` — don't wait, hand back the id straight away. - if background { - return Ok(RunResult::Backgrounded { id }); - } - - // Wait for completion or the inline window, whichever comes first. - let window = timeout_secs - .map(Duration::from_secs) - .unwrap_or(self.inline_timeout); - let mut done = rx; - let _ = tokio::time::timeout(window, done.changed()).await; - - let current = state.lock().await.clone(); - match current { - JobState::Running => Ok(RunResult::Backgrounded { id }), - finished => Ok(RunResult::Inline { - state: finished, - page: read_page(&log_path, 0, DEFAULT_PAGE).await, - }), - } - } - - /// Status + one page of a job's log. - pub async fn poll( - &self, - id: &str, - cursor: usize, - limit: Option, - ) -> Option<(JobState, Page)> { - let job = self.jobs.lock().await.get(id).cloned()?; - let state = job.state.lock().await.clone(); - let page = read_page(&job.log_path, cursor, limit.unwrap_or(DEFAULT_PAGE)).await; - Some((state, page)) - } - - pub async fn list(&self) -> Vec { - let jobs = self.jobs.lock().await; - let mut out = Vec::with_capacity(jobs.len()); - for (id, job) in jobs.iter() { - out.push(JobSummary { - id: id.clone(), - cmd: job.cmd.clone(), - state: job.state.lock().await.clone(), - }); - } - out.sort_by(|a, b| a.id.cmp(&b.id)); - out - } - - /// Kill a running job by pid. Returns false if unknown id. - pub async fn kill(&self, id: &str) -> bool { - let Some(job) = self.jobs.lock().await.get(id).cloned() else { - return false; - }; - if let Some(pid) = job.pid { - // ponytail: shell out to `kill`; pid reuse is a non-issue at our scale. - let _ = tokio::process::Command::new("kill") - .arg(pid.to_string()) - .status() - .await; - } - true - } -} - -/// Read lines `[cursor, cursor+limit)` from a log file. Re-reads the whole file -/// each call — fine for typical logs; seek by byte offset if they get huge. -async fn read_page(path: &std::path::Path, cursor: usize, limit: usize) -> Page { - let content = tokio::fs::read_to_string(path).await.unwrap_or_default(); - let all: Vec<&str> = content.lines().collect(); - let total = all.len(); - let end = (cursor + limit).min(total); - let lines = all - .get(cursor..end) - .unwrap_or(&[]) - .iter() - .map(|s| s.to_string()) - .collect(); - Page { - lines, - next_cursor: end, - total_lines: total, - has_more: end < total, - } -} - -/// Hourly: drop jobs (and their log files) older than `RETENTION` so history -/// doesn't grow without bound. ponytail: time-based only; a busy box could still -/// hold ≤24h of jobs in memory — add a count cap if that ever bites. -fn spawn_reaper(jobs: Arc>>>) { - tokio::spawn(async move { - let mut tick = tokio::time::interval(Duration::from_secs(3600)); - loop { - tick.tick().await; - let now = tokio::time::Instant::now(); - let mut map = jobs.lock().await; - let stale: Vec<(String, PathBuf)> = map - .iter() - .filter(|(_, j)| now.duration_since(j.started) > RETENTION) - .map(|(id, j)| (id.clone(), j.log_path.clone())) - .collect(); - for (id, _) in &stale { - map.remove(id); - } - drop(map); - for (_, path) in stale { - let _ = tokio::fs::remove_file(path).await; - } - } - }); -} - -#[cfg(test)] -mod tests { - use super::*; - - fn store(inline: Duration) -> JobStore { - let dir = tempfile::tempdir().unwrap().keep(); - JobStore::new(dir, inline).unwrap() - } - - #[tokio::test] - async fn fast_command_returns_inline() { - let r = store(Duration::from_secs(5)) - .run("echo hello".into(), None, None, false) - .await - .unwrap(); - match r { - RunResult::Inline { state, page } => { - assert!(matches!(state, JobState::Exited { code: 0 })); - assert!(page.lines.iter().any(|l| l.contains("hello"))); - } - RunResult::Backgrounded { .. } => panic!("fast command should be inline"), - } - } - - #[tokio::test] - async fn bg_flag_backgrounds_a_fast_command() { - // Even though `echo` is instant, bg=true must return an id without waiting. - let r = store(Duration::from_secs(5)) - .run("echo hi".into(), None, None, true) - .await - .unwrap(); - assert!(matches!(r, RunResult::Backgrounded { .. })); - } - - #[tokio::test] - async fn slow_command_backgrounds_then_completes() { - let store = store(Duration::from_millis(100)); - let r = store - .run("echo start; sleep 1; echo done".into(), None, None, false) - .await - .unwrap(); - let id = match r { - RunResult::Backgrounded { id } => id, - RunResult::Inline { .. } => panic!("slow command should background"), - }; - for _ in 0..50 { - let (state, page) = store.poll(&id, 0, None).await.unwrap(); - if matches!(state, JobState::Exited { .. }) { - assert!(page.lines.iter().any(|l| l.contains("done"))); - return; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - panic!("job never finished"); - } - - #[tokio::test] - async fn poll_paginates() { - let store = store(Duration::from_secs(5)); - let r = store - .run("seq 1 10".into(), None, None, false) - .await - .unwrap(); - // seq finishes inline; re-poll the job id to exercise pagination. - let id = match r { - RunResult::Inline { .. } => "j1".to_string(), - RunResult::Backgrounded { id } => id, - }; - let (_s, page) = store.poll(&id, 0, Some(3)).await.unwrap(); - assert_eq!(page.lines.len(), 3); - assert_eq!(page.next_cursor, 3); - assert!(page.has_more); - assert_eq!(page.total_lines, 10); - } -} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs new file mode 100644 index 0000000..381cc04 --- /dev/null +++ b/src/jobs/mod.rs @@ -0,0 +1,537 @@ +//! Job engine: run a command, return inline if it's fast, otherwise hand back +//! a job id the caller polls. Output streams to a per-job log file so polling +//! can paginate it without holding everything in memory. +use std::{ + collections::HashMap, + path::PathBuf, + process::Stdio, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; + +use tokio::sync::{Mutex, watch}; + +mod reaper; + +use reaper::{kill_job, spawn_reaper}; + +#[derive(Debug, Clone, serde::Serialize)] +#[serde(tag = "status", rename_all = "snake_case")] +pub enum JobState { + Running, + Exited { code: i32 }, + Failed { error: String }, +} + +/// Process group id to signal on kill. Wrapping the raw pid keeps this +/// lifecycle/security boundary from being confused with any other `u32`. +#[derive(Debug, Clone, Copy)] +struct ProcessGroupId(u32); + +struct Job { + cmd: String, + log_path: PathBuf, + /// Process group to signal on kill. The child leads its own group, so this + /// equals its pid (see `run`). `None` only if the OS withheld a pid. + pgid: Option, + state: Arc>, + /// Flips to `true` when the process exits; lets `kill` wait out its grace + /// period instead of polling. + done: watch::Receiver, + started: tokio::time::Instant, +} + +/// Result of starting a command. +pub enum RunResult { + /// Finished within the inline window — output is ready now. + Inline { state: JobState, page: Page }, + /// Still running — poll this id. + Backgrounded { id: String }, +} + +/// One page of log lines plus a cursor to fetch the next page. +#[derive(Debug, serde::Serialize)] +pub struct Page { + pub lines: Vec, + pub next_cursor: usize, + pub total_lines: usize, + pub has_more: bool, +} + +#[derive(Debug, serde::Serialize)] +pub struct JobSummary { + pub id: String, + pub cmd: String, + pub state: JobState, +} + +#[derive(Clone)] +pub struct JobStore { + dir: PathBuf, + inline_timeout: Duration, + seq: Arc, + jobs: Arc>>>, +} + +const DEFAULT_PAGE: usize = 200; + +impl JobStore { + pub fn new(dir: PathBuf, inline_timeout: Duration) -> std::io::Result { + std::fs::create_dir_all(&dir)?; + let jobs = Arc::new(Mutex::new(HashMap::new())); + spawn_reaper(jobs.clone()); + Ok(Self { + dir, + inline_timeout, + seq: Arc::new(AtomicU64::new(1)), + jobs, + }) + } + + /// Spawn `cmd`. With `background`, return a job id immediately; otherwise wait + /// up to the inline window and return output if it finishes in time. + pub async fn run( + &self, + cmd: String, + cwd: Option, + timeout_secs: Option, + background: bool, + ) -> std::io::Result { + let id = format!("j{}", self.seq.fetch_add(1, Ordering::Relaxed)); + let log_path = self.dir.join(format!("{id}.log")); + + // ponytail: stdout+stderr merged into one log (terminal-style). Split into + // two files if a caller ever needs them apart. + let out = std::fs::File::create(&log_path)?; + let err = out.try_clone()?; + + let mut command = tokio::process::Command::new("sh"); + command + .arg("-c") + .arg(&cmd) + .stdin(Stdio::null()) + .stdout(Stdio::from(out)) + .stderr(Stdio::from(err)); + if let Some(dir) = cwd { + command.current_dir(dir); + } + // Own process group (child becomes leader, so pgid == pid). Lets `kill` + // signal the whole tree the command spawns, not just `sh` itself. + #[cfg(unix)] + command.process_group(0); + + let mut child = command.spawn()?; + let pgid = child.id().map(ProcessGroupId); + let (tx, rx) = watch::channel(false); + let state = Arc::new(Mutex::new(JobState::Running)); + + // Waiter owns the child so it can reap it; updates shared state on exit. + { + let state = state.clone(); + tokio::spawn(async move { + let result = match child.wait().await { + Ok(s) => JobState::Exited { + code: s.code().unwrap_or(-1), + }, + Err(e) => JobState::Failed { + error: e.to_string(), + }, + }; + *state.lock().await = result; + let _ = tx.send(true); + }); + } + + let job = Arc::new(Job { + cmd, + log_path: log_path.clone(), + pgid, + state: state.clone(), + done: rx.clone(), + started: tokio::time::Instant::now(), + }); + self.jobs.lock().await.insert(id.clone(), job); + + // `bg: true` — don't wait, hand back the id straight away. + if background { + return Ok(RunResult::Backgrounded { id }); + } + + // Wait for completion or the inline window, whichever comes first. + let window = timeout_secs + .map(Duration::from_secs) + .unwrap_or(self.inline_timeout); + let mut done = rx; + let _ = tokio::time::timeout(window, done.changed()).await; + + let current = state.lock().await.clone(); + match current { + JobState::Running => Ok(RunResult::Backgrounded { id }), + finished => Ok(RunResult::Inline { + state: finished, + page: read_page(&log_path, 0, DEFAULT_PAGE).await, + }), + } + } + + /// Status + one page of a job's log. + pub async fn poll( + &self, + id: &str, + cursor: usize, + limit: Option, + ) -> Option<(JobState, Page)> { + let job = self.jobs.lock().await.get(id).cloned()?; + let state = job.state.lock().await.clone(); + let page = read_page(&job.log_path, cursor, limit.unwrap_or(DEFAULT_PAGE)).await; + Some((state, page)) + } + + pub async fn list(&self) -> Vec { + let jobs = self.jobs.lock().await; + let mut out = Vec::with_capacity(jobs.len()); + for (id, job) in jobs.iter() { + out.push(JobSummary { + id: id.clone(), + cmd: job.cmd.clone(), + state: job.state.lock().await.clone(), + }); + } + out.sort_by(|a, b| a.id.cmp(&b.id)); + out + } + + /// Kill a running job by signalling its whole process group. Returns `false` + /// when the id is unknown or the job already finished — nothing to signal. + pub async fn kill(&self, id: &str) -> bool { + let Some(job) = self.jobs.lock().await.get(id).cloned() else { + return false; + }; + kill_job(&job).await + } +} + +/// Read lines `[cursor, cursor+limit)` from a log file. Re-reads the whole file +/// each call — fine for typical logs; seek by byte offset if they get huge. +async fn read_page(path: &std::path::Path, cursor: usize, limit: usize) -> Page { + let content = tokio::fs::read_to_string(path).await.unwrap_or_default(); + let all: Vec<&str> = content.lines().collect(); + let total = all.len(); + let end = (cursor + limit).min(total); + let lines = all + .get(cursor..end) + .unwrap_or(&[]) + .iter() + .map(|s| s.to_string()) + .collect(); + Page { + lines, + next_cursor: end, + total_lines: total, + has_more: end < total, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn store(inline: Duration) -> JobStore { + let dir = tempfile::tempdir().unwrap().keep(); + JobStore::new(dir, inline).unwrap() + } + + #[tokio::test] + async fn fast_command_returns_inline() { + let r = store(Duration::from_secs(5)) + .run("echo hello".into(), None, None, false) + .await + .unwrap(); + match r { + RunResult::Inline { state, page } => { + assert!(matches!(state, JobState::Exited { code: 0 })); + assert!(page.lines.iter().any(|l| l.contains("hello"))); + } + RunResult::Backgrounded { .. } => panic!("fast command should be inline"), + } + } + + #[tokio::test] + async fn bg_flag_backgrounds_a_fast_command() { + // Even though `echo` is instant, bg=true must return an id without waiting. + let r = store(Duration::from_secs(5)) + .run("echo hi".into(), None, None, true) + .await + .unwrap(); + assert!(matches!(r, RunResult::Backgrounded { .. })); + } + + #[tokio::test] + async fn slow_command_backgrounds_then_completes() { + let store = store(Duration::from_millis(100)); + let r = store + .run("echo start; sleep 1; echo done".into(), None, None, false) + .await + .unwrap(); + let id = match r { + RunResult::Backgrounded { id } => id, + RunResult::Inline { .. } => panic!("slow command should background"), + }; + for _ in 0..50 { + let (state, page) = store.poll(&id, 0, None).await.unwrap(); + if matches!(state, JobState::Exited { .. }) { + assert!(page.lines.iter().any(|l| l.contains("done"))); + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!("job never finished"); + } + + /// `kill -0` probes liveness without delivering a signal. + #[cfg(unix)] + async fn alive(pid: &str) -> bool { + tokio::process::Command::new("kill") + .arg("-0") + .arg(pid) + .status() + .await + .map(|s| s.success()) + .unwrap_or(false) + } + + #[cfg(unix)] + #[tokio::test] + async fn kill_reaches_descendants() { + let store = store(Duration::from_millis(100)); + // `sh` backgrounds a long sleep, prints its pid, then waits on it. Job + // control is off in `sh -c`, so the child shares the shell's group. + let r = store + .run("sleep 300 & echo \"pid:$!\"; wait".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + + // Pull the descendant's pid out of the log. + let mut child_pid = None; + for _ in 0..50 { + let (_s, page) = store.poll(&id, 0, None).await.unwrap(); + if let Some(line) = page.lines.iter().find_map(|l| l.strip_prefix("pid:")) { + child_pid = Some(line.trim().to_string()); + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let child_pid = child_pid.expect("never saw the child pid"); + assert!(alive(&child_pid).await, "descendant should be running"); + + assert!(store.kill(&id).await); + + // Group kill must reap the descendant, not just `sh`. + for _ in 0..50 { + if !alive(&child_pid).await { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("descendant survived the kill"); + } + + #[tokio::test] + async fn kill_unknown_id_returns_false() { + assert!(!store(Duration::from_secs(5)).kill("nope").await); + } + + #[tokio::test] + async fn kill_finished_job_returns_false() { + let store = store(Duration::from_secs(5)); + // Runs inline, so it has already exited by the time `run` returns. + let r = store + .run("echo bye".into(), None, None, false) + .await + .unwrap(); + assert!(matches!(r, RunResult::Inline { .. })); + assert!(!store.kill("j1").await); + } + + #[cfg(unix)] + #[tokio::test] + async fn kill_escalates_to_sigkill_when_term_ignored() { + let store = store(Duration::from_millis(100)); + // The shell traps (ignores) TERM, and the ignore disposition is inherited + // by its children, so only KILL can reap the group. + let r = store + .run( + "trap '' TERM; while true; do sleep 1; done".into(), + None, + None, + true, + ) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + tokio::time::sleep(Duration::from_millis(200)).await; + + assert!(store.kill(&id).await); + + // TERM is ignored; the post-grace KILL must still bring it down. + for _ in 0..100 { + let (state, _) = store.poll(&id, 0, None).await.unwrap(); + if matches!(state, JobState::Exited { .. }) { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!("job survived TERM->KILL escalation"); + } + + #[cfg(unix)] + #[tokio::test] + async fn reaper_kills_running_job_before_eviction() { + let store = store(Duration::from_millis(100)); + // Backgrounded descendant; print its pid so we can probe it post-eviction. + let r = store + .run("sleep 300 & echo \"pid:$!\"; wait".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + + let mut child_pid = None; + for _ in 0..50 { + let (_s, page) = store.poll(&id, 0, None).await.unwrap(); + if let Some(line) = page.lines.iter().find_map(|l| l.strip_prefix("pid:")) { + child_pid = Some(line.trim().to_string()); + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let child_pid = child_pid.expect("never saw the child pid"); + assert!(alive(&child_pid).await, "descendant should be running"); + + // Retention zero => the just-started job is already stale. + reaper::reap_once(&store.jobs, Duration::ZERO).await; + + // Evicted from the map (poll can't find it)... + assert!( + store.poll(&id, 0, None).await.is_none(), + "job should be evicted" + ); + // ...and its process group reaped, not orphaned. + for _ in 0..50 { + if !alive(&child_pid).await { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("descendant survived eviction"); + } + + #[tokio::test] + async fn reaper_keeps_running_job_when_kill_fails() { + let store = store(Duration::from_millis(100)); + // A job with no pgid can't be signalled, so kill always fails while the + // job still reads as Running. Evicting it would orphan a live group and + // delete its log, so the reaper must keep it tracked for a later pass. + let (_tx, rx) = watch::channel(false); + let log_path = store.dir.join("jfake.log"); + tokio::fs::write(&log_path, "running\n").await.unwrap(); + let job = Arc::new(Job { + cmd: "unkillable".into(), + log_path: log_path.clone(), + pgid: None, + state: Arc::new(Mutex::new(JobState::Running)), + done: rx, + started: tokio::time::Instant::now() - Duration::from_secs(1), + }); + store.jobs.lock().await.insert("jfake".into(), job); + + // The backdated job is stale; kill fails => must not be evicted. + reaper::reap_once(&store.jobs, Duration::ZERO).await; + + assert!( + store.poll("jfake", 0, None).await.is_some(), + "running job whose kill failed must stay tracked" + ); + assert!(log_path.exists(), "its log must not be deleted"); + } + + #[cfg(unix)] + #[tokio::test] + async fn kill_terminates_child_process() { + let store = store(Duration::from_millis(100)); + let r = store + .run("sleep 1000".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + // Give the process a moment to start before we try to kill it. + tokio::time::sleep(Duration::from_millis(50)).await; + + assert!( + store.kill(&id).await, + "kill should return true for a running job" + ); + + // After kill the state must transition away from Running. + for _ in 0..50 { + let (state, _) = store.poll(&id, 0, None).await.unwrap(); + if !matches!(state, JobState::Running) { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("job remained Running after kill"); + } + + #[tokio::test] + async fn list_reports_all_jobs() { + let store = store(Duration::from_secs(5)); + // Two distinct commands — one inline, one explicitly backgrounded. + store + .run("echo alpha".into(), None, None, false) + .await + .unwrap(); + store + .run("echo beta".into(), None, None, true) + .await + .unwrap(); + + let jobs = store.list().await; + assert_eq!(jobs.len(), 2, "expected two jobs, got {}", jobs.len()); + let cmds: Vec<&str> = jobs.iter().map(|j| j.cmd.as_str()).collect(); + assert!(cmds.contains(&"echo alpha"), "missing 'echo alpha'"); + assert!(cmds.contains(&"echo beta"), "missing 'echo beta'"); + // IDs must be sorted so the list is deterministic. + assert!(jobs[0].id < jobs[1].id, "list should be sorted by id"); + } + + #[tokio::test] + async fn poll_paginates() { + let store = store(Duration::from_secs(5)); + let r = store + .run("seq 1 10".into(), None, None, false) + .await + .unwrap(); + // seq finishes inline; re-poll the job id to exercise pagination. + let id = match r { + RunResult::Inline { .. } => "j1".to_string(), + RunResult::Backgrounded { id } => id, + }; + let (_s, page) = store.poll(&id, 0, Some(3)).await.unwrap(); + assert_eq!(page.lines.len(), 3); + assert_eq!(page.next_cursor, 3); + assert!(page.has_more); + assert_eq!(page.total_lines, 10); + } +} diff --git a/src/jobs/reaper.rs b/src/jobs/reaper.rs new file mode 100644 index 0000000..62bf51b --- /dev/null +++ b/src/jobs/reaper.rs @@ -0,0 +1,114 @@ +//! Reaper + kill signalling: evict aged-out jobs and terminate process groups. +//! +//! A job leads its own process group (so its pgid equals its pid; see +//! `JobStore::run`), which lets a single signal to the negative pid reach the +//! whole tree the command spawned, not just `sh` itself. +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use tokio::sync::{Mutex, watch}; + +use super::{Job, JobState, ProcessGroupId}; + +/// Jobs (and their logs) older than this are reaped hourly. +const RETENTION: Duration = Duration::from_secs(24 * 3600); +/// Grace between `SIGTERM` and `SIGKILL` when killing a job's process group. +const KILL_GRACE: Duration = Duration::from_secs(2); + +/// Signal a job's process group dead: `SIGTERM`, then `SIGKILL` if it outlasts a +/// short grace. Returns `true` if it signalled a running job, `false` if there +/// was nothing to kill (already finished, the OS withheld its pid) or the signal +/// could not be delivered. +pub(super) async fn kill_job(job: &Job) -> bool { + if !matches!(*job.state.lock().await, JobState::Running) { + return false; + } + let Some(pgid) = job.pgid else { + return false; + }; + if !signal_group(pgid, "TERM").await { + return false; + } + // Give the group a chance to exit on TERM; force it with KILL otherwise. + if !exited_within(job.done.clone(), KILL_GRACE).await && !signal_group(pgid, "KILL").await { + return false; + } + true +} + +/// Send `signal` (`"TERM"`, `"KILL"`, …) to process group `pgid`. The negative +/// pid targets the whole group so descendants die too, not just `sh`; `--` stops +/// `kill` reading it as an option. Returns whether the signal was delivered. +/// ponytail: pid reuse is a non-issue here. +async fn signal_group(pgid: ProcessGroupId, signal: &str) -> bool { + match tokio::process::Command::new("kill") + .arg(format!("-{signal}")) + .arg("--") + .arg(format!("-{}", pgid.0)) + .status() + .await + { + Ok(status) => status.success(), + Err(error) => { + tracing::warn!(%error, pgid = pgid.0, signal, "failed to signal process group"); + false + } + } +} + +/// Wait up to `grace` for the job to exit, watching its completion flag rather +/// than polling. Returns true if it exited in time, false if the grace elapsed. +async fn exited_within(mut done: watch::Receiver, grace: Duration) -> bool { + // The waiter flips the flag to true exactly once, on exit. A receiver error + // means the sender dropped, which only happens after that same exit. + tokio::time::timeout(grace, done.wait_for(|&exited| exited)) + .await + .is_ok() +} + +/// Hourly: drop jobs (and their log files) older than `RETENTION` so history +/// doesn't grow without bound. ponytail: time-based only; a busy box could still +/// hold ≤24h of jobs in memory — add a count cap if that ever bites. +pub(super) fn spawn_reaper(jobs: Arc>>>) { + tokio::spawn(async move { + let mut tick = tokio::time::interval(Duration::from_secs(3600)); + loop { + tick.tick().await; + reap_once(&jobs, RETENTION).await; + } + }); +} + +/// One reaping pass: evict every job older than `retention`. A still-`Running` +/// job is killed first, so eviction never orphans its process group. +pub(super) async fn reap_once(jobs: &Mutex>>, retention: Duration) { + let now = tokio::time::Instant::now(); + let map = jobs.lock().await; + let stale: Vec<(String, Arc)> = map + .iter() + .filter(|(_, j)| now.duration_since(j.started) > retention) + .map(|(id, j)| (id.clone(), j.clone())) + .collect(); + drop(map); + + // Kill first so a still-running group is never orphaned by eviction. Only + // evict jobs that finished or whose group we actually signalled; a running + // job whose kill failed stays tracked (pollable/killable) for a later pass. + let mut removable: Vec<(String, Arc)> = Vec::new(); + for (id, job) in &stale { + if kill_job(job).await || !matches!(*job.state.lock().await, JobState::Running) { + removable.push((id.clone(), job.clone())); + } else { + tracing::warn!(id = %id, "stale running job not evicted: kill failed"); + } + } + + let mut map = jobs.lock().await; + for (id, _) in &removable { + map.remove(id); + } + drop(map); + + for (_, job) in &removable { + let _ = tokio::fs::remove_file(&job.log_path).await; + } +}