From 7586ff41546363a01fecb0c7b82ad91a6e8c25a9 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 16:59:33 -0500 Subject: [PATCH 1/8] fix(jobs): spawn jobs in own process group so kills reach descendants Each job's `sh` now leads its own process group (process_group(0) on unix), and kill signals the negative pgid via `kill -- -`. Without this, killing only the shell's pid orphaned any children it spawned (e.g. `cmd &`), leaving them running after a kill. Test `kill_reaches_descendants` proves a backgrounded grandchild is reaped by the kill; verified it fails without the process-group change. Co-Authored-By: Claude --- src/jobs.rs | 74 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 5976b36..2bd1474 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -25,7 +25,9 @@ pub enum JobState { struct Job { cmd: String, log_path: PathBuf, - pid: Option, + /// Process group to signal on kill. The child leads its own group, so this + /// equals its pid (see `run`). `None` only if the OS withheld a pid. + pgid: Option, state: Arc>, started: tokio::time::Instant, } @@ -106,9 +108,13 @@ impl JobStore { if let Some(dir) = cwd { command.current_dir(dir); } + // Own process group (child becomes leader, so pgid == pid). Lets `kill` + // signal the whole tree the command spawns, not just `sh` itself. + #[cfg(unix)] + command.process_group(0); let mut child = command.spawn()?; - let pid = child.id(); + let pgid = child.id(); let (tx, rx) = watch::channel(false); let state = Arc::new(Mutex::new(JobState::Running)); @@ -132,7 +138,7 @@ impl JobStore { let job = Arc::new(Job { cmd, log_path: log_path.clone(), - pid, + pgid, state: state.clone(), started: tokio::time::Instant::now(), }); @@ -187,15 +193,18 @@ impl JobStore { out } - /// Kill a running job by pid. Returns false if unknown id. + /// Kill a running job's whole process group. Returns false if unknown id. pub async fn kill(&self, id: &str) -> bool { let Some(job) = self.jobs.lock().await.get(id).cloned() else { return false; }; - if let Some(pid) = job.pid { - // ponytail: shell out to `kill`; pid reuse is a non-issue at our scale. + if let Some(pgid) = job.pgid { + // Negative pid targets the process group, so descendants the command + // spawned die too — not just `sh`. `--` keeps `kill` from reading the + // leading `-` as an option. ponytail: pid reuse is a non-issue here. let _ = tokio::process::Command::new("kill") - .arg(pid.to_string()) + .arg("--") + .arg(format!("-{pgid}")) .status() .await; } @@ -306,6 +315,57 @@ mod tests { panic!("job never finished"); } + /// `kill -0` probes liveness without delivering a signal. + #[cfg(unix)] + async fn alive(pid: &str) -> bool { + tokio::process::Command::new("kill") + .arg("-0") + .arg(pid) + .status() + .await + .map(|s| s.success()) + .unwrap_or(false) + } + + #[cfg(unix)] + #[tokio::test] + async fn kill_reaches_descendants() { + let store = store(Duration::from_millis(100)); + // `sh` backgrounds a long sleep, prints its pid, then waits on it. Job + // control is off in `sh -c`, so the child shares the shell's group. + let r = store + .run("sleep 300 & echo \"pid:$!\"; wait".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + + // Pull the descendant's pid out of the log. + let mut child_pid = None; + for _ in 0..50 { + let (_s, page) = store.poll(&id, 0, None).await.unwrap(); + if let Some(line) = page.lines.iter().find_map(|l| l.strip_prefix("pid:")) { + child_pid = Some(line.trim().to_string()); + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let child_pid = child_pid.expect("never saw the child pid"); + assert!(alive(&child_pid).await, "descendant should be running"); + + assert!(store.kill(&id).await); + + // Group kill must reap the descendant, not just `sh`. + for _ in 0..50 { + if !alive(&child_pid).await { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("descendant survived the kill"); + } + #[tokio::test] async fn poll_paginates() { let store = store(Duration::from_secs(5)); From dfd09e51b43103f4c60b245c6cba55918b6440a4 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:05:08 -0500 Subject: [PATCH 2/8] fix(jobs): escalate kill TERM->KILL; return accurate status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - kill() now SIGTERMs the process group, then SIGKILLs after a grace period if it hasn't exited (escalation via signal_group helper). - Return false when the job is not Running (unknown id or already finished) — nothing to signal; previously always returned true. - Wait out the grace event-driven on the job's completion watch flag (exited_within) instead of polling. - Tests: unknown id, finished job, and TERM-ignored escalation to KILL. Co-Authored-By: Claude --- src/jobs.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 10 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 2bd1474..76ea188 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -29,6 +29,9 @@ struct Job { /// equals its pid (see `run`). `None` only if the OS withheld a pid. pgid: Option, state: Arc>, + /// Flips to `true` when the process exits; lets `kill` wait out its grace + /// period instead of polling. + done: watch::Receiver, started: tokio::time::Instant, } @@ -67,6 +70,8 @@ pub struct JobStore { const DEFAULT_PAGE: usize = 200; /// Jobs (and their logs) older than this are reaped hourly. const RETENTION: Duration = Duration::from_secs(24 * 3600); +/// Grace between `SIGTERM` and `SIGKILL` when killing a job's process group. +const KILL_GRACE: Duration = Duration::from_secs(2); impl JobStore { pub fn new(dir: PathBuf, inline_timeout: Duration) -> std::io::Result { @@ -140,6 +145,7 @@ impl JobStore { log_path: log_path.clone(), pgid, state: state.clone(), + done: rx.clone(), started: tokio::time::Instant::now(), }); self.jobs.lock().await.insert(id.clone(), job); @@ -193,25 +199,50 @@ impl JobStore { out } - /// Kill a running job's whole process group. Returns false if unknown id. + /// Kill a running job by signalling its whole process group: `SIGTERM`, then + /// `SIGKILL` if it outlasts a short grace. Returns `false` when the id is + /// unknown or the job already finished — nothing to signal in either case. pub async fn kill(&self, id: &str) -> bool { let Some(job) = self.jobs.lock().await.get(id).cloned() else { return false; }; - if let Some(pgid) = job.pgid { - // Negative pid targets the process group, so descendants the command - // spawned die too — not just `sh`. `--` keeps `kill` from reading the - // leading `-` as an option. ponytail: pid reuse is a non-issue here. - let _ = tokio::process::Command::new("kill") - .arg("--") - .arg(format!("-{pgid}")) - .status() - .await; + if !matches!(*job.state.lock().await, JobState::Running) { + return false; + } + let Some(pgid) = job.pgid else { + return false; + }; + signal_group(pgid, "TERM").await; + // Give the group a chance to exit on TERM; force it with KILL otherwise. + if !exited_within(job.done.clone(), KILL_GRACE).await { + signal_group(pgid, "KILL").await; } true } } +/// Send `signal` (`"TERM"`, `"KILL"`, …) to process group `pgid`. The negative +/// pid targets the whole group so descendants die too, not just `sh`; `--` stops +/// `kill` reading it as an option. ponytail: pid reuse is a non-issue here. +async fn signal_group(pgid: u32, signal: &str) { + let _ = tokio::process::Command::new("kill") + .arg(format!("-{signal}")) + .arg("--") + .arg(format!("-{pgid}")) + .status() + .await; +} + +/// Wait up to `grace` for the job to exit, watching its completion flag rather +/// than polling. Returns true if it exited in time, false if the grace elapsed. +async fn exited_within(mut done: watch::Receiver, grace: Duration) -> bool { + // The waiter flips the flag to true exactly once, on exit. A receiver error + // means the sender dropped, which only happens after that same exit. + tokio::time::timeout(grace, done.wait_for(|&exited| exited)) + .await + .is_ok() +} + /// Read lines `[cursor, cursor+limit)` from a log file. Re-reads the whole file /// each call — fine for typical logs; seek by byte offset if they get huge. async fn read_page(path: &std::path::Path, cursor: usize, limit: usize) -> Page { @@ -366,6 +397,56 @@ mod tests { panic!("descendant survived the kill"); } + #[tokio::test] + async fn kill_unknown_id_returns_false() { + assert!(!store(Duration::from_secs(5)).kill("nope").await); + } + + #[tokio::test] + async fn kill_finished_job_returns_false() { + let store = store(Duration::from_secs(5)); + // Runs inline, so it has already exited by the time `run` returns. + let r = store + .run("echo bye".into(), None, None, false) + .await + .unwrap(); + assert!(matches!(r, RunResult::Inline { .. })); + assert!(!store.kill("j1").await); + } + + #[cfg(unix)] + #[tokio::test] + async fn kill_escalates_to_sigkill_when_term_ignored() { + let store = store(Duration::from_millis(100)); + // The shell traps (ignores) TERM, and the ignore disposition is inherited + // by its children, so only KILL can reap the group. + let r = store + .run( + "trap '' TERM; while true; do sleep 1; done".into(), + None, + None, + true, + ) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + tokio::time::sleep(Duration::from_millis(200)).await; + + assert!(store.kill(&id).await); + + // TERM is ignored; the post-grace KILL must still bring it down. + for _ in 0..100 { + let (state, _) = store.poll(&id, 0, None).await.unwrap(); + if matches!(state, JobState::Exited { .. }) { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!("job survived TERM->KILL escalation"); + } + #[tokio::test] async fn poll_paginates() { let store = store(Duration::from_secs(5)); From f49fe048dd2e75a82d64a13be4abecf58fa4f4e9 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:13:51 -0500 Subject: [PATCH 3/8] fix: reaper kills still-Running jobs before eviction The hourly reaper evicted aged-out jobs (map.remove + log delete) but never signalled a still-`Running` job's process group, orphaning the whole process tree. Now it kills the group (TERM->KILL escalation) before eviction, reusing the same `kill_job` helper as `job(action="kill")`. mod.rs crossed 300 LOC, so per the module convention the reaper + signal helpers move to `src/jobs/reaper.rs`. `reap_once(jobs, retention)` is extracted with an injectable retention so the new test drives a real eviction with zero retention and asserts the descendant is reaped. Co-Authored-By: Claude Opus 4.8 --- CLAUDE.md | 6 +- src/{jobs.rs => jobs/mod.rs} | 117 +++++++++++++++-------------------- src/jobs/reaper.rs | 88 ++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 69 deletions(-) rename src/{jobs.rs => jobs/mod.rs} (82%) create mode 100644 src/jobs/reaper.rs diff --git a/CLAUDE.md b/CLAUDE.md index 678b16f..9d5d205 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,7 +53,8 @@ Keep this accurate — it's the navigation aid. | `src/config.rs` | env + TOML file config; fails fast if auth creds missing | | `src/auth.rs` | HTTP Basic auth middleware | | `src/oauth/` | minimal OAuth 2.1 server: discovery metadata, dynamic client registration, authorize + token with PKCE, bearer validation | -| `src/jobs.rs` | job engine: run a command, return inline if fast (<2s) else a job id (or immediately when `bg`); output streams to a per-job log file, polled paginated; hourly reaper drops jobs >24h old | +| `src/jobs/mod.rs` | job engine: run a command, return inline if fast (<2s) else a job id (or immediately when `bg`); output streams to a per-job log file, polled paginated | +| `src/jobs/reaper.rs` | hourly reaper drops jobs >24h old (killing any still-`Running` group first); process-group kill helpers (TERM→KILL escalation), shared with `job(action="kill")` | | `src/tools/mod.rs` | MCP tool surface (`#[tool_router]`/`#[tool]` from rmcp): 3 tools (`bash`/`job`/`file`) dispatching on `action`. Thin adapters over jobs + files | | `src/tools/files.rs` | file operations (`tokio::fs`; `ls`/`find`/`grep` shelled out) | @@ -125,7 +126,8 @@ Non-negotiable: SOLID, SRP, tested code. The bar: idiomatic, boring, readable Ru | Config / env / required creds | `src/config.rs` | | HTTP Basic auth | `src/auth.rs` | | OAuth 2.1 (discovery, registration, PKCE, bearer) | `src/oauth/` | -| Running commands, backgrounding, job logs | `src/jobs.rs` | +| Running commands, backgrounding, job logs | `src/jobs/mod.rs` | +| Reaper eviction + process-group kill helpers | `src/jobs/reaper.rs` | | Tool definitions / MCP surface | `src/tools/mod.rs` | | File operations | `src/tools/files.rs` | diff --git a/src/jobs.rs b/src/jobs/mod.rs similarity index 82% rename from src/jobs.rs rename to src/jobs/mod.rs index 76ea188..54d46df 100644 --- a/src/jobs.rs +++ b/src/jobs/mod.rs @@ -14,6 +14,10 @@ use std::{ use tokio::sync::{Mutex, watch}; +mod reaper; + +use reaper::{kill_job, spawn_reaper}; + #[derive(Debug, Clone, serde::Serialize)] #[serde(tag = "status", rename_all = "snake_case")] pub enum JobState { @@ -68,10 +72,6 @@ pub struct JobStore { } const DEFAULT_PAGE: usize = 200; -/// Jobs (and their logs) older than this are reaped hourly. -const RETENTION: Duration = Duration::from_secs(24 * 3600); -/// Grace between `SIGTERM` and `SIGKILL` when killing a job's process group. -const KILL_GRACE: Duration = Duration::from_secs(2); impl JobStore { pub fn new(dir: PathBuf, inline_timeout: Duration) -> std::io::Result { @@ -199,50 +199,16 @@ impl JobStore { out } - /// Kill a running job by signalling its whole process group: `SIGTERM`, then - /// `SIGKILL` if it outlasts a short grace. Returns `false` when the id is - /// unknown or the job already finished — nothing to signal in either case. + /// Kill a running job by signalling its whole process group. Returns `false` + /// when the id is unknown or the job already finished — nothing to signal. pub async fn kill(&self, id: &str) -> bool { let Some(job) = self.jobs.lock().await.get(id).cloned() else { return false; }; - if !matches!(*job.state.lock().await, JobState::Running) { - return false; - } - let Some(pgid) = job.pgid else { - return false; - }; - signal_group(pgid, "TERM").await; - // Give the group a chance to exit on TERM; force it with KILL otherwise. - if !exited_within(job.done.clone(), KILL_GRACE).await { - signal_group(pgid, "KILL").await; - } - true + kill_job(&job).await } } -/// Send `signal` (`"TERM"`, `"KILL"`, …) to process group `pgid`. The negative -/// pid targets the whole group so descendants die too, not just `sh`; `--` stops -/// `kill` reading it as an option. ponytail: pid reuse is a non-issue here. -async fn signal_group(pgid: u32, signal: &str) { - let _ = tokio::process::Command::new("kill") - .arg(format!("-{signal}")) - .arg("--") - .arg(format!("-{pgid}")) - .status() - .await; -} - -/// Wait up to `grace` for the job to exit, watching its completion flag rather -/// than polling. Returns true if it exited in time, false if the grace elapsed. -async fn exited_within(mut done: watch::Receiver, grace: Duration) -> bool { - // The waiter flips the flag to true exactly once, on exit. A receiver error - // means the sender dropped, which only happens after that same exit. - tokio::time::timeout(grace, done.wait_for(|&exited| exited)) - .await - .is_ok() -} - /// Read lines `[cursor, cursor+limit)` from a log file. Re-reads the whole file /// each call — fine for typical logs; seek by byte offset if they get huge. async fn read_page(path: &std::path::Path, cursor: usize, limit: usize) -> Page { @@ -264,32 +230,6 @@ async fn read_page(path: &std::path::Path, cursor: usize, limit: usize) -> Page } } -/// Hourly: drop jobs (and their log files) older than `RETENTION` so history -/// doesn't grow without bound. ponytail: time-based only; a busy box could still -/// hold ≤24h of jobs in memory — add a count cap if that ever bites. -fn spawn_reaper(jobs: Arc>>>) { - tokio::spawn(async move { - let mut tick = tokio::time::interval(Duration::from_secs(3600)); - loop { - tick.tick().await; - let now = tokio::time::Instant::now(); - let mut map = jobs.lock().await; - let stale: Vec<(String, PathBuf)> = map - .iter() - .filter(|(_, j)| now.duration_since(j.started) > RETENTION) - .map(|(id, j)| (id.clone(), j.log_path.clone())) - .collect(); - for (id, _) in &stale { - map.remove(id); - } - drop(map); - for (_, path) in stale { - let _ = tokio::fs::remove_file(path).await; - } - } - }); -} - #[cfg(test)] mod tests { use super::*; @@ -447,6 +387,49 @@ mod tests { panic!("job survived TERM->KILL escalation"); } + #[cfg(unix)] + #[tokio::test] + async fn reaper_kills_running_job_before_eviction() { + let store = store(Duration::from_millis(100)); + // Backgrounded descendant; print its pid so we can probe it post-eviction. + let r = store + .run("sleep 300 & echo \"pid:$!\"; wait".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + + let mut child_pid = None; + for _ in 0..50 { + let (_s, page) = store.poll(&id, 0, None).await.unwrap(); + if let Some(line) = page.lines.iter().find_map(|l| l.strip_prefix("pid:")) { + child_pid = Some(line.trim().to_string()); + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let child_pid = child_pid.expect("never saw the child pid"); + assert!(alive(&child_pid).await, "descendant should be running"); + + // Retention zero => the just-started job is already stale. + reaper::reap_once(&store.jobs, Duration::ZERO).await; + + // Evicted from the map (poll can't find it)... + assert!( + store.poll(&id, 0, None).await.is_none(), + "job should be evicted" + ); + // ...and its process group reaped, not orphaned. + for _ in 0..50 { + if !alive(&child_pid).await { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("descendant survived eviction"); + } + #[tokio::test] async fn poll_paginates() { let store = store(Duration::from_secs(5)); diff --git a/src/jobs/reaper.rs b/src/jobs/reaper.rs new file mode 100644 index 0000000..cbeac89 --- /dev/null +++ b/src/jobs/reaper.rs @@ -0,0 +1,88 @@ +//! Reaper + kill signalling: evict aged-out jobs and terminate process groups. +//! +//! A job leads its own process group (so its pgid equals its pid; see +//! `JobStore::run`), which lets a single signal to the negative pid reach the +//! whole tree the command spawned, not just `sh` itself. +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use tokio::sync::{Mutex, watch}; + +use super::{Job, JobState}; + +/// Jobs (and their logs) older than this are reaped hourly. +const RETENTION: Duration = Duration::from_secs(24 * 3600); +/// Grace between `SIGTERM` and `SIGKILL` when killing a job's process group. +const KILL_GRACE: Duration = Duration::from_secs(2); + +/// Signal a job's process group dead: `SIGTERM`, then `SIGKILL` if it outlasts a +/// short grace. Returns `true` if it signalled a running job, `false` if there +/// was nothing to kill (already finished, or the OS withheld its pid). +pub(super) async fn kill_job(job: &Job) -> bool { + if !matches!(*job.state.lock().await, JobState::Running) { + return false; + } + let Some(pgid) = job.pgid else { + return false; + }; + signal_group(pgid, "TERM").await; + // Give the group a chance to exit on TERM; force it with KILL otherwise. + if !exited_within(job.done.clone(), KILL_GRACE).await { + signal_group(pgid, "KILL").await; + } + true +} + +/// Send `signal` (`"TERM"`, `"KILL"`, …) to process group `pgid`. The negative +/// pid targets the whole group so descendants die too, not just `sh`; `--` stops +/// `kill` reading it as an option. ponytail: pid reuse is a non-issue here. +async fn signal_group(pgid: u32, signal: &str) { + let _ = tokio::process::Command::new("kill") + .arg(format!("-{signal}")) + .arg("--") + .arg(format!("-{pgid}")) + .status() + .await; +} + +/// Wait up to `grace` for the job to exit, watching its completion flag rather +/// than polling. Returns true if it exited in time, false if the grace elapsed. +async fn exited_within(mut done: watch::Receiver, grace: Duration) -> bool { + // The waiter flips the flag to true exactly once, on exit. A receiver error + // means the sender dropped, which only happens after that same exit. + tokio::time::timeout(grace, done.wait_for(|&exited| exited)) + .await + .is_ok() +} + +/// Hourly: drop jobs (and their log files) older than `RETENTION` so history +/// doesn't grow without bound. ponytail: time-based only; a busy box could still +/// hold ≤24h of jobs in memory — add a count cap if that ever bites. +pub(super) fn spawn_reaper(jobs: Arc>>>) { + tokio::spawn(async move { + let mut tick = tokio::time::interval(Duration::from_secs(3600)); + loop { + tick.tick().await; + reap_once(&jobs, RETENTION).await; + } + }); +} + +/// One reaping pass: evict every job older than `retention`. A still-`Running` +/// job is killed first, so eviction never orphans its process group. +pub(super) async fn reap_once(jobs: &Mutex>>, retention: Duration) { + let now = tokio::time::Instant::now(); + let mut map = jobs.lock().await; + let stale: Vec<(String, Arc)> = map + .iter() + .filter(|(_, j)| now.duration_since(j.started) > retention) + .map(|(id, j)| (id.clone(), j.clone())) + .collect(); + for (id, _) in &stale { + map.remove(id); + } + drop(map); + for (_, job) in &stale { + kill_job(job).await; + let _ = tokio::fs::remove_file(&job.log_path).await; + } +} From ac64ed019d192477bdfb55c3ab6af87867b46f76 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:15:04 -0500 Subject: [PATCH 4/8] test(jobs): add kill_terminates_child_process and list_reports_all_jobs - kill_terminates_child_process: bg sleep 1000, kill, assert state leaves Running - list_reports_all_jobs: two jobs, assert both appear sorted in list() - kill_unknown_id_returns_false already existed; no duplicate added Co-Authored-By: Claude --- src/jobs/mod.rs | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 54d46df..b8c0a37 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -430,6 +430,58 @@ mod tests { panic!("descendant survived eviction"); } + #[cfg(unix)] + #[tokio::test] + async fn kill_terminates_child_process() { + let store = store(Duration::from_millis(100)); + let r = store + .run("sleep 1000".into(), None, None, true) + .await + .unwrap(); + let RunResult::Backgrounded { id } = r else { + panic!("bg should background"); + }; + // Give the process a moment to start before we try to kill it. + tokio::time::sleep(Duration::from_millis(50)).await; + + assert!( + store.kill(&id).await, + "kill should return true for a running job" + ); + + // After kill the state must transition away from Running. + for _ in 0..50 { + let (state, _) = store.poll(&id, 0, None).await.unwrap(); + if !matches!(state, JobState::Running) { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + panic!("job remained Running after kill"); + } + + #[tokio::test] + async fn list_reports_all_jobs() { + let store = store(Duration::from_secs(5)); + // Two distinct commands — one fast (inline), one long (backgrounded). + store + .run("echo alpha".into(), None, None, false) + .await + .unwrap(); + store + .run("echo beta".into(), None, None, false) + .await + .unwrap(); + + let jobs = store.list().await; + assert_eq!(jobs.len(), 2, "expected two jobs, got {}", jobs.len()); + let cmds: Vec<&str> = jobs.iter().map(|j| j.cmd.as_str()).collect(); + assert!(cmds.contains(&"echo alpha"), "missing 'echo alpha'"); + assert!(cmds.contains(&"echo beta"), "missing 'echo beta'"); + // IDs must be sorted so the list is deterministic. + assert!(jobs[0].id < jobs[1].id, "list should be sorted by id"); + } + #[tokio::test] async fn poll_paginates() { let store = store(Duration::from_secs(5)); From 8d30efc9066b3203d62629fac31eff71da2d1bf0 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:15:05 -0500 Subject: [PATCH 5/8] chore: add CodeRabbit config tailored to mcp-ssh Adapted from the developerz-ai house template (db-mcp-gateway), scoped to mcp-ssh's modules and CLAUDE.md NEVER/Conventions rules. Beta features on (early_access), concise path instructions. Co-Authored-By: Claude Opus 4.8 (1M context) --- .coderabbit.yaml | 164 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 .coderabbit.yaml diff --git a/.coderabbit.yaml b/.coderabbit.yaml new file mode 100644 index 0000000..8161916 --- /dev/null +++ b/.coderabbit.yaml @@ -0,0 +1,164 @@ +# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json + +# mcp-ssh — CodeRabbit configuration +# +# Single Rust binary: an MCP server giving an AI agent remote shell + file +# access to ONE host, over authenticated MCP Streamable HTTP at /mcp. +# Authority: CLAUDE.md (Conventions + NEVER) is absolute. Cite the rule. + +language: en-US +early_access: true +enable_free_tier: true + +tone_instructions: "mcp-ssh: Rust 2024, tokio, axum, rmcp. CLAUDE.md NEVER rules are absolute — no password/token in logs/errors/responses, never run as root, never weaken auth, never add a 4th MCP tool (parametrize bash/job/file). Concrete diffs, skip nits." + +reviews: + profile: assertive + request_changes_workflow: true + high_level_summary: true + high_level_summary_placeholder: "@coderabbitai summary" + auto_title_placeholder: "@coderabbitai" + poem: false + review_status: true + collapse_walkthrough: false + sequence_diagrams: true + estimate_code_review_effort: true + suggested_reviewers: true + auto_assign_reviewers: false + abort_on_close: true + + auto_review: + enabled: true + auto_incremental_review: true + ignore_title_keywords: + - "WIP" + - "DRAFT" + - "DO NOT REVIEW" + drafts: false + base_branches: + - main + + tools: + shellcheck: { enabled: true } + markdownlint: { enabled: true } + yamllint: { enabled: true } + actionlint: { enabled: true } + hadolint: { enabled: true } + gitleaks: { enabled: true } + github-checks: { enabled: true, timeout_ms: 900000 } + ruff: { enabled: false } + biome: { enabled: false } + eslint: { enabled: false } + rubocop: { enabled: false } + + path_instructions: + - path: "**/*" + instructions: | + Review only changed files and code directly affected. Behaviour + changes must update docs/ in the same PR. Flag missing tests for new + public behaviour. + + # Rust source — CLAUDE.md Conventions + - path: "src/**/*.rs" + instructions: | + Enforce CLAUDE.md Conventions: + - No unwrap/expect outside main.rs and #[cfg(test)]; panic in the + request path crashes every client. + - Errors typed with thiserror; anyhow only at the main.rs boundary. + - Async end-to-end. No std::sync::Mutex on the request path (use + tokio::sync), never held across .await. No block_on; spawn_blocking + for blocking I/O. + - Newtype over bare primitives with meaning (JobId, not String). + Make illegal states unrepresentable. + - tracing spans (request_id, tool), never println!. + - Files <=300 LOC, one responsibility. clippy -D warnings + rustfmt + clean is the floor. + + # Auth + OAuth — security review required + - path: "src/auth.rs" + instructions: | + SECURITY REVIEW. HTTP Basic middleware. Never log/return the + password. Flag any path that bypasses or weakens auth. + - path: "src/oauth/**" + instructions: | + SECURITY REVIEW. OAuth 2.1: discovery, dynamic registration, PKCE on + authorize+token, bearer validation. No token material in logs/errors. + + # Config — fail fast, secrets + - path: "src/config.rs" + instructions: | + Validated at boot — missing auth creds fail at startup, never at + request time. MCP_SSH_ALLOWED_HOSTS must be required. Secrets never + logged. + + # Tools — constant 3-tool surface + - path: "src/tools/**" + instructions: | + Exactly 3 tools (bash/job/file) dispatching on action. NEW capability + = a new param/action, never a 4th tool. Thin adapters — no business + logic in the dispatch arm. + + # Jobs — backgrounding, log pagination, process-group kill + - path: "src/jobs/**" + instructions: | + Output streams to a per-job log file; poll paginates (cursor/limit) + so chatty commands never flood agent context. Jobs spawn in their own + process group so kills reach descendants. Kill escalates TERM->KILL. + Reaper drops jobs >24h, killing any still-Running group first. + + # Integration tests — real server over HTTP + - path: "tests/**/*.rs" + instructions: | + Boot the server and issue real MCP requests over HTTP. New behaviour + ships with the test that proves it. + + # CI + - path: ".github/workflows/**" + instructions: | + Runner: blacksmith-2vcpu-ubuntu-2404. Pin actions to major versions. + Required jobs: fmt, clippy (-D warnings), test. + + # Dockerfile + - path: "Dockerfile" + instructions: | + Multi-stage build. Non-root user — never run as root. No build + secrets baked in. + + - path: "**/*.md" + instructions: | + Lead with the rule, tables over prose, file paths over descriptions, + no filler. Valid links, accurate code examples. + + path_filters: + - "!target/**" + - "!**/target/**" + - "!**/*.lock" + - "!Cargo.lock" + - "!**/*.log" + - "!.env*" + - "!**/.DS_Store" + +chat: + auto_reply: true + art: false + +knowledge_base: + opt_out: false + + web_search: + enabled: true + + code_guidelines: + enabled: true + filePatterns: + - "**/CLAUDE.md" + - "docs/**" + + learnings: + scope: auto + + issues: + scope: auto + + pull_requests: + scope: auto From 09dcbfedfaf622de78e47c56f37afbccbbc50778 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:25:15 -0500 Subject: [PATCH 6/8] =?UTF-8?q?fix(jobs):=20address=20PR=20#2=20review=20?= =?UTF-8?q?=E2=80=94=20kill=20correctness=20+=20newtype?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - reaper: signal_group returns delivery status; kill_job reports false when TERM/KILL never reach the group - reaper: reap_once kills stale jobs before evicting them, so a still-Running group is never orphaned mid-grace - jobs: wrap pgid in a private ProcessGroupId newtype - test: list_reports_all_jobs now exercises the backgrounded path - coderabbit: stop filtering Cargo.lock/lockfiles out of review Co-Authored-By: Claude --- .coderabbit.yaml | 2 -- src/jobs/mod.rs | 13 +++++++++---- src/jobs/reaper.rs | 44 ++++++++++++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/.coderabbit.yaml b/.coderabbit.yaml index 8161916..ed71dc9 100644 --- a/.coderabbit.yaml +++ b/.coderabbit.yaml @@ -132,8 +132,6 @@ reviews: path_filters: - "!target/**" - "!**/target/**" - - "!**/*.lock" - - "!Cargo.lock" - "!**/*.log" - "!.env*" - "!**/.DS_Store" diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index b8c0a37..8f3ade0 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -26,12 +26,17 @@ pub enum JobState { Failed { error: String }, } +/// Process group id to signal on kill. Wrapping the raw pid keeps this +/// lifecycle/security boundary from being confused with any other `u32`. +#[derive(Debug, Clone, Copy)] +struct ProcessGroupId(u32); + struct Job { cmd: String, log_path: PathBuf, /// Process group to signal on kill. The child leads its own group, so this /// equals its pid (see `run`). `None` only if the OS withheld a pid. - pgid: Option, + pgid: Option, state: Arc>, /// Flips to `true` when the process exits; lets `kill` wait out its grace /// period instead of polling. @@ -119,7 +124,7 @@ impl JobStore { command.process_group(0); let mut child = command.spawn()?; - let pgid = child.id(); + let pgid = child.id().map(ProcessGroupId); let (tx, rx) = watch::channel(false); let state = Arc::new(Mutex::new(JobState::Running)); @@ -463,13 +468,13 @@ mod tests { #[tokio::test] async fn list_reports_all_jobs() { let store = store(Duration::from_secs(5)); - // Two distinct commands — one fast (inline), one long (backgrounded). + // Two distinct commands — one inline, one explicitly backgrounded. store .run("echo alpha".into(), None, None, false) .await .unwrap(); store - .run("echo beta".into(), None, None, false) + .run("echo beta".into(), None, None, true) .await .unwrap(); diff --git a/src/jobs/reaper.rs b/src/jobs/reaper.rs index cbeac89..dc09012 100644 --- a/src/jobs/reaper.rs +++ b/src/jobs/reaper.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{Mutex, watch}; -use super::{Job, JobState}; +use super::{Job, JobState, ProcessGroupId}; /// Jobs (and their logs) older than this are reaped hourly. const RETENTION: Duration = Duration::from_secs(24 * 3600); @@ -16,7 +16,8 @@ const KILL_GRACE: Duration = Duration::from_secs(2); /// Signal a job's process group dead: `SIGTERM`, then `SIGKILL` if it outlasts a /// short grace. Returns `true` if it signalled a running job, `false` if there -/// was nothing to kill (already finished, or the OS withheld its pid). +/// was nothing to kill (already finished, the OS withheld its pid) or the signal +/// could not be delivered. pub(super) async fn kill_job(job: &Job) -> bool { if !matches!(*job.state.lock().await, JobState::Running) { return false; @@ -24,24 +25,34 @@ pub(super) async fn kill_job(job: &Job) -> bool { let Some(pgid) = job.pgid else { return false; }; - signal_group(pgid, "TERM").await; + if !signal_group(pgid, "TERM").await { + return false; + } // Give the group a chance to exit on TERM; force it with KILL otherwise. - if !exited_within(job.done.clone(), KILL_GRACE).await { - signal_group(pgid, "KILL").await; + if !exited_within(job.done.clone(), KILL_GRACE).await && !signal_group(pgid, "KILL").await { + return false; } true } /// Send `signal` (`"TERM"`, `"KILL"`, …) to process group `pgid`. The negative /// pid targets the whole group so descendants die too, not just `sh`; `--` stops -/// `kill` reading it as an option. ponytail: pid reuse is a non-issue here. -async fn signal_group(pgid: u32, signal: &str) { - let _ = tokio::process::Command::new("kill") +/// `kill` reading it as an option. Returns whether the signal was delivered. +/// ponytail: pid reuse is a non-issue here. +async fn signal_group(pgid: ProcessGroupId, signal: &str) -> bool { + match tokio::process::Command::new("kill") .arg(format!("-{signal}")) .arg("--") - .arg(format!("-{pgid}")) + .arg(format!("-{}", pgid.0)) .status() - .await; + .await + { + Ok(status) => status.success(), + Err(error) => { + tracing::warn!(%error, pgid = pgid.0, signal, "failed to signal process group"); + false + } + } } /// Wait up to `grace` for the job to exit, watching its completion flag rather @@ -71,18 +82,27 @@ pub(super) fn spawn_reaper(jobs: Arc>>>) { /// job is killed first, so eviction never orphans its process group. pub(super) async fn reap_once(jobs: &Mutex>>, retention: Duration) { let now = tokio::time::Instant::now(); - let mut map = jobs.lock().await; + let map = jobs.lock().await; let stale: Vec<(String, Arc)> = map .iter() .filter(|(_, j)| now.duration_since(j.started) > retention) .map(|(id, j)| (id.clone(), j.clone())) .collect(); + drop(map); + + // Kill first so a still-running group is never orphaned by eviction; the job + // stays in the map (pollable/killable) until its termination completes. + for (_, job) in &stale { + kill_job(job).await; + } + + let mut map = jobs.lock().await; for (id, _) in &stale { map.remove(id); } drop(map); + for (_, job) in &stale { - kill_job(job).await; let _ = tokio::fs::remove_file(&job.log_path).await; } } From 2c1ab8824963908a4dbc0a715bc7b7a0bb20fa13 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:30:43 -0500 Subject: [PATCH 7/8] fix(reaper): don't evict running jobs when kill fails reap_once removed every stale job and deleted its log even when kill_job failed on a still-Running job, orphaning the live process group and deleting its log out from under it. Only evict jobs that finished or whose group was successfully signalled; keep failed running jobs tracked for a later reap pass. Co-Authored-By: Claude --- src/jobs/mod.rs | 29 +++++++++++++++++++++++++++++ src/jobs/reaper.rs | 18 ++++++++++++------ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 8f3ade0..05dd1c0 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -435,6 +435,35 @@ mod tests { panic!("descendant survived eviction"); } + #[tokio::test] + async fn reaper_keeps_running_job_when_kill_fails() { + let store = store(Duration::from_millis(100)); + // A job with no pgid can't be signalled, so kill always fails while the + // job still reads as Running. Evicting it would orphan a live group and + // delete its log, so the reaper must keep it tracked for a later pass. + let (_tx, rx) = watch::channel(false); + let log_path = store.dir.join("jfake.log"); + tokio::fs::write(&log_path, "running\n").await.unwrap(); + let job = Arc::new(Job { + cmd: "unkillable".into(), + log_path: log_path.clone(), + pgid: None, + state: Arc::new(Mutex::new(JobState::Running)), + done: rx, + started: tokio::time::Instant::now(), + }); + store.jobs.lock().await.insert("jfake".into(), job); + + // Retention zero => stale immediately; kill fails => must not be evicted. + reaper::reap_once(&store.jobs, Duration::ZERO).await; + + assert!( + store.poll("jfake", 0, None).await.is_some(), + "running job whose kill failed must stay tracked" + ); + assert!(log_path.exists(), "its log must not be deleted"); + } + #[cfg(unix)] #[tokio::test] async fn kill_terminates_child_process() { diff --git a/src/jobs/reaper.rs b/src/jobs/reaper.rs index dc09012..62bf51b 100644 --- a/src/jobs/reaper.rs +++ b/src/jobs/reaper.rs @@ -90,19 +90,25 @@ pub(super) async fn reap_once(jobs: &Mutex>>, retention .collect(); drop(map); - // Kill first so a still-running group is never orphaned by eviction; the job - // stays in the map (pollable/killable) until its termination completes. - for (_, job) in &stale { - kill_job(job).await; + // Kill first so a still-running group is never orphaned by eviction. Only + // evict jobs that finished or whose group we actually signalled; a running + // job whose kill failed stays tracked (pollable/killable) for a later pass. + let mut removable: Vec<(String, Arc)> = Vec::new(); + for (id, job) in &stale { + if kill_job(job).await || !matches!(*job.state.lock().await, JobState::Running) { + removable.push((id.clone(), job.clone())); + } else { + tracing::warn!(id = %id, "stale running job not evicted: kill failed"); + } } let mut map = jobs.lock().await; - for (id, _) in &stale { + for (id, _) in &removable { map.remove(id); } drop(map); - for (_, job) in &stale { + for (_, job) in &removable { let _ = tokio::fs::remove_file(&job.log_path).await; } } From 525e009586fdc97bb574d01d397746bad6d28004 Mon Sep 17 00:00:00 2001 From: sebi Date: Fri, 26 Jun 2026 17:33:40 -0500 Subject: [PATCH 8/8] test: backdate started so reaper stale branch fires deterministically - reaper_keeps_running_job_when_kill_fails used Instant::now(), letting the test pass without exercising the kill-failure path under ZERO retention - Backdate started by 1s so duration_since > retention is guaranteed Co-Authored-By: Claude --- src/jobs/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 05dd1c0..381cc04 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -450,11 +450,11 @@ mod tests { pgid: None, state: Arc::new(Mutex::new(JobState::Running)), done: rx, - started: tokio::time::Instant::now(), + started: tokio::time::Instant::now() - Duration::from_secs(1), }); store.jobs.lock().await.insert("jfake".into(), job); - // Retention zero => stale immediately; kill fails => must not be evicted. + // The backdated job is stale; kill fails => must not be evicted. reaper::reap_once(&store.jobs, Duration::ZERO).await; assert!(