diff --git a/README.md b/README.md index dd601ce..8df6fcf 100644 --- a/README.md +++ b/README.md @@ -75,56 +75,6 @@ Save this as `.harmont/pipeline.py` (or `.harmont/pipeline.ts`):
Python -```python -import harmont as hm - -@hm.pipeline("ci") -def ci() -> hm.Step: - return ( - hm.sh("echo 'hello from harmont'", label="hello") - .sh("uname -a", label="env") - ) -``` - -
- -
-TypeScript - -```typescript -import { sh, pipeline, type PipelineDefinition } from "harmont"; - -const pipelines: PipelineDefinition[] = [ - { - slug: "ci", - pipeline: pipeline( - sh("echo 'hello from harmont'", { label: "hello" }) - .sh("uname -a", { label: "env" }), - ), - }, -]; - -export default pipelines; -``` - -
- -### 2. Run it - -```sh -hm run ci -``` - -If the repo declares only one pipeline, the slug is optional - just `hm run`. - -### Real-world example - -For production pipelines, use typed toolchains - they generate test, lint, and -format steps from your project layout: - -
-Python - ```python import harmont as hm from harmont.python import PythonToolchain @@ -177,6 +127,14 @@ export default pipelines;
+### 2. Run it + +```sh +hm run ci +``` + +If the repo declares only one pipeline, the slug is optional - just `hm run`. + Browse the [example projects](./examples) for idiomatic pipelines in Rust, Go, Python, Java, C++, React, Next.js, and more. diff --git a/crates/hm-pipeline-ir/tests/e2e_fixtures.rs b/crates/hm-pipeline-ir/tests/e2e_fixtures.rs index 024ea87..74d839d 100644 --- a/crates/hm-pipeline-ir/tests/e2e_fixtures.rs +++ b/crates/hm-pipeline-ir/tests/e2e_fixtures.rs @@ -50,8 +50,6 @@ fn edge_kinds(g: &PipelineGraph) -> (usize, usize) { (builds_in, depends_on) } -// ---- Python fixtures ---- - #[test] fn python_monorepo_ci() { let g = load_fixture("python", "monorepo-ci"); @@ -115,8 +113,6 @@ fn python_kitchen_sink() { } } -// ---- TypeScript fixtures ---- - #[test] fn ts_monorepo_ci() { let g = load_fixture("ts", "monorepo-ci"); @@ -145,8 +141,6 @@ fn ts_kitchen_sink() { assert!(g.node_count() >= 12); } -// ---- Structural invariants on all fixtures ---- - #[test] fn all_fixtures_have_valid_structure() { for dsl in ["python", "ts"] { @@ -172,8 +166,6 @@ fn all_fixtures_have_valid_structure() { } } -// ---- Cross-DSL parity ---- - #[test] fn parity_node_count() { for scenario in SCENARIOS { diff --git a/crates/hm/src/cli/mod.rs b/crates/hm/src/cli/mod.rs index aace559..bc1c8d1 100644 --- a/crates/hm/src/cli/mod.rs +++ b/crates/hm/src/cli/mod.rs @@ -67,6 +67,8 @@ pub enum CacheCommand { Save(CacheSaveArgs), /// Restore harmont Docker images from a cache directory. Restore(CacheRestoreArgs), + /// Remove all cached workspaces and Docker images. + Clean, } #[derive(Debug, Clone, clap::Args)] @@ -92,6 +94,7 @@ pub async fn dispatch(command: Command, ctx: RunContext) -> Result { Command::Cache(cmd) => match cmd { CacheCommand::Save(args) => crate::commands::cache::handle_save(&args.dir).await, CacheCommand::Restore(args) => crate::commands::cache::handle_restore(&args.dir).await, + CacheCommand::Clean => crate::commands::cache::handle_clean().await, }, Command::Version => version::run().await.map(|()| 0), Command::Plugin(cmd) => plugin::run(cmd).await.map(|()| 0), diff --git a/crates/hm/src/commands/cache/clean.rs b/crates/hm/src/commands/cache/clean.rs new file mode 100644 index 0000000..86849fc --- /dev/null +++ b/crates/hm/src/commands/cache/clean.rs @@ -0,0 +1,100 @@ +use anyhow::Result; + +/// # Errors +/// Returns an error if workspace cache removal or Docker image listing fails. +pub async fn handle_clean() -> Result { + let mut cleaned = if let Some(ws_cache) = hm_util::dirs::harmont_workspace_cache_dir() + && ws_cache.exists() + { + let size = dir_size(&ws_cache); + std::fs::remove_dir_all(&ws_cache)?; + tracing::info!( + path = %ws_cache.display(), + "removed workspace cache ({})", + human_bytes(size), + ); + true + } else { + false + }; + + let docker = match crate::orchestrator::docker_client::DockerClient::connect() { + Ok(d) => match d.ping().await { + Ok(()) => Some(d), + Err(e) => { + tracing::warn!(%e, "Docker daemon unreachable — skipping image cleanup"); + None + } + }, + Err(e) => { + tracing::warn!(%e, "cannot connect to Docker — skipping image cleanup"); + None + } + }; + + if let Some(docker) = &docker { + let cache_images = docker.list_images_by_prefix("harmont-cache/").await?; + for tag in &cache_images { + if let Err(e) = docker.remove_image(tag).await { + tracing::warn!(image = %tag, %e, "failed to remove cached image"); + } else { + tracing::info!(image = %tag, "removed cached Docker image"); + cleaned = true; + } + } + + let ephemeral_images = docker + .list_images_by_prefix("harmont-local-ephemeral/") + .await?; + for tag in &ephemeral_images { + if let Err(e) = docker.remove_image(tag).await { + tracing::warn!(image = %tag, %e, "failed to remove ephemeral image"); + } else { + tracing::info!(image = %tag, "removed ephemeral Docker image"); + cleaned = true; + } + } + } + + if !cleaned { + tracing::info!("nothing to clean"); + } + + Ok(0) +} + +fn dir_size(path: &std::path::Path) -> u64 { + fn walk(p: &std::path::Path) -> u64 { + std::fs::read_dir(p) + .into_iter() + .flatten() + .filter_map(std::result::Result::ok) + .map(|e| { + let path = e.path(); + if path.is_dir() { + walk(&path) + } else { + e.metadata().map_or(0, |m| m.len()) + } + }) + .sum() + } + walk(path) +} + +#[allow( + clippy::cast_precision_loss, + reason = "human-readable display; sub-byte precision irrelevant" +)] +fn human_bytes(bytes: u64) -> String { + let b = bytes as f64; + if bytes < 1024 { + format!("{bytes}B") + } else if bytes < 1024 * 1024 { + format!("{:.1}KB", b / 1024.0) + } else if bytes < 1024 * 1024 * 1024 { + format!("{:.1}MB", b / (1024.0 * 1024.0)) + } else { + format!("{:.1}GB", b / (1024.0 * 1024.0 * 1024.0)) + } +} diff --git a/crates/hm/src/commands/cache/mod.rs b/crates/hm/src/commands/cache/mod.rs index c011ea7..d7174a6 100644 --- a/crates/hm/src/commands/cache/mod.rs +++ b/crates/hm/src/commands/cache/mod.rs @@ -1,6 +1,8 @@ +mod clean; pub mod manifest; mod restore; mod save; +pub use clean::handle_clean; pub use restore::handle_restore; pub use save::handle_save; diff --git a/crates/hm/src/orchestrator/archive.rs b/crates/hm/src/orchestrator/archive.rs index c6cd64c..f5662b5 100644 --- a/crates/hm/src/orchestrator/archive.rs +++ b/crates/hm/src/orchestrator/archive.rs @@ -42,6 +42,12 @@ impl ArchiveStore { .unwrap_or(0) } + /// Return a clone of the full archive bytes, or `None` if unknown. + #[must_use] + pub fn get_bytes(&self, id: ArchiveId) -> Option> { + self.archives.lock().ok()?.get(&id).cloned() + } + /// Read up to `max` bytes from offset `offset`. Returns empty /// when offset is beyond end, or when the archive is unknown. #[must_use] diff --git a/crates/hm/src/orchestrator/cache.rs b/crates/hm/src/orchestrator/cache.rs index 1e3208e..12c1d3c 100644 --- a/crates/hm/src/orchestrator/cache.rs +++ b/crates/hm/src/orchestrator/cache.rs @@ -1,16 +1,14 @@ //! Host-side cache decision. //! -//! Resolves a wire-typed [`CommandStep`] against the local COW -//! workspace cache directory and returns the wire-typed -//! [`CacheDecision`] consumed by step execution. +//! Resolves a wire-typed [`CommandStep`] against Docker image tags +//! to decide whether to skip execution (cache hit) or run + commit. //! //! Cache keys are computed by `harmont.keygen` at plan time and ride //! along the JSON in `cache.key`. -use std::path::{Path, PathBuf}; +use hm_plugin_protocol::CommandStep; -use anyhow::{Context, Result}; -use hm_plugin_protocol::{CacheDecision, CommandStep, SnapshotRef}; +use super::docker_client::DockerClient; fn sanitize_for_tag(s: &str) -> String { s.chars() @@ -24,123 +22,43 @@ fn sanitize_for_tag(s: &str) -> String { .collect() } -// --------------------------------------------------------------------------- -// COW workspace cache -// --------------------------------------------------------------------------- - -/// The outcome of a COW workspace cache lookup. -#[derive(Debug)] -pub struct CowCacheOutcome { - pub decision: CacheDecision, - pub cache_to: Option, - pub stale_dirs: Vec, -} - -/// Resolve the on-disk cache directory for a step's COW workspace. +/// Derive a deterministic Docker image tag for a cacheable step. /// /// Returns `None` when the step has no cache, a `"none"` policy, or no -/// cache key — matching the same guard logic as [`cache_image_tag`]. -/// -/// # Errors -/// Returns an error if the config directory cannot be resolved. -pub fn cow_cache_dir(step: &CommandStep) -> Result> { - let cache = match step.cache.as_ref() { - Some(c) if c.policy != "none" => c, - _ => return Ok(None), - }; - let Some(key) = cache.key.as_deref() else { - return Ok(None); - }; - let ws_cache = hm_util::dirs::harmont_workspace_cache_dir() - .ok_or_else(|| anyhow::anyhow!("cannot resolve ~/.harmont/cache/workspaces"))?; +/// cache key. +#[must_use] +pub fn stable_cache_tag(step: &CommandStep) -> Option { + let cache = step.cache.as_ref()?; + if cache.policy == "none" { + return None; + } + let key = cache.key.as_deref()?; let safe = sanitize_for_tag(&step.key); let short = &key[..key.len().min(16)]; - Ok(Some(ws_cache.join(safe).join(short))) + Some(format!("harmont-cache/{safe}:{short}")) } -/// Decide cache outcome for a step against the local COW workspace -/// cache directory. -/// -/// # Errors -/// Returns an error if the config directory cannot be resolved or the -/// stale directory listing fails. -pub fn decide_cow(step: &CommandStep) -> Result { - let Some(cache_dir) = cow_cache_dir(step)? else { - return Ok(CowCacheOutcome { - decision: CacheDecision::MissNoCommit, - cache_to: None, - stale_dirs: vec![], - }); +/// Remove Docker images for `step_key` that don't match `current_tag`. +pub async fn evict_stale_docker_tags( + docker: &DockerClient, + step_key: &str, + current_tag: Option<&str>, +) { + let safe = sanitize_for_tag(step_key); + let reference = format!("harmont-cache/{safe}"); + let tags = match docker.list_images_by_reference(&reference).await { + Ok(t) => t, + Err(e) => { + tracing::warn!(%e, "failed to list images for stale eviction"); + return; + } }; - if cache_dir.exists() { - Ok(CowCacheOutcome { - decision: CacheDecision::Hit { - tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), - }, - cache_to: None, - stale_dirs: vec![], - }) - } else { - let Some(step_cache_root) = cache_dir.parent() else { - return Ok(CowCacheOutcome { - decision: CacheDecision::MissBuildAs { - tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), - }, - cache_to: Some(cache_dir), - stale_dirs: vec![], - }); - }; - let stale = if step_cache_root.exists() { - std::fs::read_dir(step_cache_root)? - .filter_map(std::result::Result::ok) - .map(|e| e.path()) - .filter(|p| *p != cache_dir) - .collect() - } else { - vec![] - }; - Ok(CowCacheOutcome { - decision: CacheDecision::MissBuildAs { - tag: SnapshotRef::from(format!("cow:{}", cache_dir.display())), - }, - cache_to: Some(cache_dir), - stale_dirs: stale, - }) - } -} - -/// Persist a completed workspace directory into the COW cache. -/// -/// Creates intermediate directories and performs a COW clone. If the -/// cache directory already exists (e.g. a concurrent run beat us) the -/// function returns `Ok(())` without overwriting. -/// -/// # Errors -/// Returns an error if the parent directory cannot be created or the -/// COW clone fails. -pub fn persist_cow_cache(workspace_path: &Path, cache_dir: &Path) -> Result<()> { - if let Some(parent) = cache_dir.parent() { - std::fs::create_dir_all(parent)?; - } - if cache_dir.exists() { - return Ok(()); - } - match hm_util::cow::cow_clone_dir(workspace_path, cache_dir) { - Ok(()) => Ok(()), - Err(e) if cache_dir.exists() => { - tracing::debug!(%e, "concurrent run already populated cache"); - Ok(()) + for tag in tags { + if Some(tag.as_str()) == current_tag { + continue; } - Err(e) => Err(e).context("persist workspace to COW cache"), - } -} - -/// Remove stale COW cache directories left over from previous cache -/// keys. Failures are logged but never propagated. -pub fn evict_stale_cow_dirs(dirs: &[PathBuf]) { - for dir in dirs { - if let Err(e) = std::fs::remove_dir_all(dir) { - tracing::warn!(path = %dir.display(), %e, "failed to evict stale COW cache"); + if let Err(e) = docker.remove_image(&tag).await { + tracing::warn!(image = %tag, %e, "failed to remove stale cached Docker image"); } } } @@ -173,60 +91,30 @@ mod tests { } #[test] - fn cow_cache_dir_returns_path_for_cacheable_step() { + fn stable_cache_tag_for_cacheable_step() { let s = step(Some(Cache { policy: "ttl".into(), key: Some("0123456789abcdef0000".into()), })); - let dir = cow_cache_dir(&s).unwrap(); - assert!(dir.is_some(), "expected Some for cacheable step"); - let dir = dir.unwrap(); - assert!( - dir.ends_with("cache/workspaces/build/0123456789abcdef"), - "unexpected path: {}", - dir.display() + let tag = stable_cache_tag(&s); + assert_eq!( + tag, + Some("harmont-cache/build:0123456789abcdef".to_string()) ); } #[test] - fn cow_cache_dir_returns_none_for_no_cache() { + fn stable_cache_tag_none_for_uncacheable() { let s = step(None); - let dir = cow_cache_dir(&s).unwrap(); - assert!(dir.is_none()); + assert_eq!(stable_cache_tag(&s), None); } #[test] - fn cow_cache_dir_returns_none_for_policy_none() { + fn stable_cache_tag_none_for_policy_none() { let s = step(Some(Cache { policy: "none".into(), - key: Some("abcdef1234567890".into()), - })); - let dir = cow_cache_dir(&s).unwrap(); - assert!(dir.is_none()); - } - - #[test] - fn decide_cow_miss_no_commit_when_no_cache() { - let s = step(None); - let outcome = decide_cow(&s).unwrap(); - assert!(outcome.decision.is_miss_no_commit()); - assert!(outcome.cache_to.is_none()); - assert!(outcome.stale_dirs.is_empty()); - } - - #[test] - fn decide_cow_miss_build_as_for_new_key() { - // Use a unique key that will not exist on disk. - let s = step(Some(Cache { - policy: "ttl".into(), - key: Some("deadbeefcafebabe9999".into()), + key: Some("abc".into()), })); - let outcome = decide_cow(&s).unwrap(); - assert!( - outcome.decision.is_miss_build_as(), - "expected MissBuildAs, got {:?}", - outcome.decision - ); - assert!(outcome.cache_to.is_some()); + assert_eq!(stable_cache_tag(&s), None); } } diff --git a/crates/hm/src/orchestrator/docker_client.rs b/crates/hm/src/orchestrator/docker_client.rs index 208c9d0..970e1ea 100644 --- a/crates/hm/src/orchestrator/docker_client.rs +++ b/crates/hm/src/orchestrator/docker_client.rs @@ -16,7 +16,7 @@ use bollard::container::{ use bollard::exec::{CreateExecOptions, StartExecResults}; use bollard::image::{ CommitContainerOptions, CreateImageOptions, ImportImageOptions, ListImagesOptions, - RemoveImageOptions, + RemoveImageOptions, TagImageOptions, }; use bollard::models::HostConfig; use futures_util::StreamExt; @@ -408,6 +408,30 @@ impl DockerClient { Ok(()) } + /// Add an additional tag to an existing image. + /// + /// `source` is the existing image reference (tag or ID) and + /// `new_tag` is the desired `repo:tag` string. If `new_tag` + /// contains no `:`, the Docker-default tag `"latest"` is used. + /// + /// # Errors + /// + /// Returns [`HmError::Docker`] if `tag_image` fails (source image + /// not found, daemon I/O failure). + pub async fn tag_image(&self, source: &str, new_tag: &str) -> Result<()> { + let parts: Vec<&str> = new_tag.splitn(2, ':').collect(); + let (repo, tag) = match parts.as_slice() { + [r, v] => (*r, *v), + [r] => (*r, "latest"), + _ => unreachable!("splitn(2) yields one or two parts for non-empty input"), + }; + self.inner + .tag_image(source, Some(TagImageOptions { repo, tag })) + .await + .map_err(|e| HmError::Docker(format!("tag_image '{source}' -> '{new_tag}': {e}")))?; + Ok(()) + } + /// Export a Docker image to a tar file on disk. /// /// Streams the image layer data from the daemon and writes it to diff --git a/crates/hm/src/orchestrator/mod.rs b/crates/hm/src/orchestrator/mod.rs index f6fffcd..7c23600 100644 --- a/crates/hm/src/orchestrator/mod.rs +++ b/crates/hm/src/orchestrator/mod.rs @@ -14,6 +14,5 @@ pub mod output_subscriber; pub mod scheduler; pub mod signal; pub mod source; -pub mod workspace; pub use scheduler::run; diff --git a/crates/hm/src/orchestrator/scheduler.rs b/crates/hm/src/orchestrator/scheduler.rs index cfbecd4..4e503ce 100644 --- a/crates/hm/src/orchestrator/scheduler.rs +++ b/crates/hm/src/orchestrator/scheduler.rs @@ -89,17 +89,6 @@ pub async fn run( // Build the source archive once. let archive_bytes = build_archive_bytes(&repo_root).context("build source archive")?; - - // Extract the source archive into a temporary directory and create - // a workspace manager for per-step COW clones. This must happen - // before `archives.register()` which consumes the bytes. - let run_dir = std::env::temp_dir().join(format!("harmont-run-{run_id}")); - let workspace = { - let mgr = super::workspace::WorkspaceManager::from_archive(run_dir, &archive_bytes) - .context("init COW workspace")?; - Arc::new(std::sync::Mutex::new(mgr)) - }; - let archive_id = archives.register(archive_bytes); let run_ctx = RunContext { @@ -107,7 +96,6 @@ pub async fn run( event_bus: bus.clone(), archives: archives.clone(), cancel: cancel.clone(), - workspace: workspace.clone(), }; let parallelism = parallelism.max(1); @@ -244,15 +232,6 @@ pub async fn run( } } - { - let mut mgr = workspace - .lock() - .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; - if let Err(e) = mgr.cleanup() { - tracing::warn!(%e, "failed to clean up COW workspace"); - } - } - bus.emit(BuildEvent::BuildEnd { exit_code: overall, duration_ms: dur, @@ -307,32 +286,11 @@ async fn execute_step( display_name: display_name.clone(), }); - let cow_outcome = cache::decide_cow(&step_wire)?; - let decision = cow_outcome.decision; - let cow_cache_to = cow_outcome.cache_to; - let cow_stale_dirs = cow_outcome.stale_dirs; - + // --- Docker image cache check --- + let cache_tag = cache::stable_cache_tag(&step_wire); + if let Some(ref dtag) = cache_tag + && run_ctx.docker.image_exists(dtag).await.unwrap_or(false) { - let mut mgr = run_ctx - .workspace - .lock() - .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; - if let CacheDecision::Hit { ref tag } = decision { - if tag.0.starts_with("cow:") { - let cached_path = PathBuf::from(&tag.0[4..]); - mgr.create_workspace_from_cache(&step_key, &cached_path) - .context("create workspace from cache")?; - } else { - mgr.create_workspace(&step_key, parent_key.as_deref()) - .context("create workspace for cache hit")?; - } - } else { - mgr.create_workspace(&step_key, parent_key.as_deref()) - .context("create workspace for step")?; - } - } - - if let CacheDecision::Hit { tag } = &decision { bus.emit(BuildEvent::StepCacheHit { step_id, key: step_wire @@ -340,14 +298,22 @@ async fn execute_step( .as_ref() .and_then(|c| c.key.clone()) .unwrap_or_default(), - tag: tag.0.clone(), + tag: dtag.clone(), }); return Ok(StepOutcome { exit_code: 0, - snapshot: Some(tag.clone()), + snapshot: Some(SnapshotRef::from(dtag.clone())), }); } + let cache_lookup = cache_tag + .as_ref() + .map_or(CacheDecision::MissNoCommit, |tag| { + CacheDecision::MissBuildAs { + tag: SnapshotRef::from(tag.clone()), + } + }); + let input = ExecutorInput { step: step_wire, workspace_archive_id: archive_id, @@ -355,7 +321,7 @@ async fn execute_step( workdir: "/workspace".to_string(), run_id, step_id, - cache_lookup: decision, + cache_lookup, parent_snapshot, }; @@ -409,23 +375,15 @@ async fn execute_step( ts: chrono::Utc::now(), }); cancel.cancel(); - } else { - if let Some(ref cache_to) = cow_cache_to { - let ws_path = { - let mgr = run_ctx - .workspace - .lock() - .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; - mgr.workspace_path(&step_key) - .map(std::path::Path::to_path_buf) - }; - if let Some(ws) = ws_path - && let Err(e) = cache::persist_cow_cache(&ws, cache_to) - { - tracing::warn!(%e, "failed to persist COW cache"); - } + } else if let Some(ref snapshot) = sr.committed_snapshot { + if let Some(ref dtag) = cache_tag + && snapshot.0 != *dtag + && let Err(e) = run_ctx.docker.tag_image(&snapshot.0, dtag).await + { + tracing::warn!(%e, "failed to re-tag Docker image for cache"); } - cache::evict_stale_cow_dirs(&cow_stale_dirs); + cache::evict_stale_docker_tags(&run_ctx.docker, &step_key, cache_tag.as_deref()) + .await; } Ok(StepOutcome { exit_code: sr.exit_code, diff --git a/crates/hm/src/output/human.rs b/crates/hm/src/output/human.rs index 2f286e8..6a3d9ce 100644 --- a/crates/hm/src/output/human.rs +++ b/crates/hm/src/output/human.rs @@ -149,10 +149,6 @@ where } } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { diff --git a/crates/hm/src/output/progress.rs b/crates/hm/src/output/progress.rs index 1c67b91..0c035e4 100644 --- a/crates/hm/src/output/progress.rs +++ b/crates/hm/src/output/progress.rs @@ -361,10 +361,6 @@ where } } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { diff --git a/crates/hm/src/runner/docker.rs b/crates/hm/src/runner/docker.rs index 68183a7..eeef27f 100644 --- a/crates/hm/src/runner/docker.rs +++ b/crates/hm/src/runner/docker.rs @@ -1,9 +1,10 @@ //! Docker-based step runner. //! -//! Each step runs inside a Docker container with the workspace -//! bind-mounted from the host via COW clones. System-level state -//! (installed packages) propagates via Docker image commits; workspace -//! files propagate via host-side COW directory clones. +//! Each step runs inside a Docker container. The source archive is +//! piped into `/workspace` via `tar -xzf -` before the step command +//! runs. System-level state (packages, caches, virtualenvs) AND +//! workspace files all propagate via Docker image commits — no bind +//! mounts, no host-side COW clones. use std::future::Future; use std::pin::Pin; @@ -18,10 +19,6 @@ use uuid::Uuid; use super::{RunContext, StepRunner}; use crate::orchestrator::events::EventBus; -// --------------------------------------------------------------------------- -// DockerRunner -// --------------------------------------------------------------------------- - /// Step runner that executes pipeline steps inside Docker containers /// via the local daemon (Bollard). #[derive(Debug)] @@ -42,10 +39,6 @@ impl StepRunner for DockerRunner { } } -// --------------------------------------------------------------------------- -// Step execution -// --------------------------------------------------------------------------- - fn resolve_image(step: &CommandStep, input: &ExecutorInput) -> String { if let Some(snap) = &input.parent_snapshot { return snap.to_string(); @@ -66,17 +59,6 @@ async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result }); } - let workspace_mgr = &ctx.workspace; - - let workspace_path = { - let mgr = workspace_mgr - .lock() - .map_err(|_| anyhow::anyhow!("workspace manager mutex poisoned"))?; - mgr.workspace_path(&input.step.key) - .map(std::path::Path::to_path_buf) - .ok_or_else(|| anyhow::anyhow!("workspace for step '{}' not created", input.step.key))? - }; - let image = resolve_image(&input.step, &input); let container_name = sanitize_container_name(&input.run_id.to_string(), &input.step.key); let env_vec: Vec = input.env.iter().map(|(k, v)| format!("{k}={v}")).collect(); @@ -93,13 +75,38 @@ async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result } } - // Start container with workspace bind mount. - let binds = vec![format!("{}:{}", workspace_path.display(), input.workdir)]; let cid = ctx .docker - .start_long_lived_with_mounts(&image, &env_vec, &input.workdir, &container_name, &binds) + .start_long_lived(&image, &env_vec, &input.workdir, &container_name) + .await + .context("docker start failed")?; + + // Pipe source archive into /workspace. Runs for every step — cached + // parent images contain stale workspace files; tar overwrites source + // while preserving build artifacts (tar is additive). + let archive_bytes = ctx + .archives + .get_bytes(input.workspace_archive_id) + .ok_or_else(|| anyhow::anyhow!("source archive not found"))?; + + let mkdir_cmd = vec!["mkdir".into(), "-p".into(), input.workdir.clone()]; + let mut sink = tokio::io::sink(); + ctx.docker + .exec_streaming(&cid, &mkdir_cmd, &env_vec, "/", &mut sink) .await - .context("docker start with mounts failed")?; + .context("mkdir /workspace")?; + + let tar_cmd = vec![ + "tar".into(), + "-xzf".into(), + "-".into(), + "-C".into(), + input.workdir.clone(), + ]; + ctx.docker + .exec_streaming_stdin(&cid, &tar_cmd, &env_vec, "/", &archive_bytes, &mut sink) + .await + .context("pipe source archive into container")?; let result = run_in_container(ctx, &cid, &input, &env_vec, &plan).await; ctx.docker.stop_remove(&cid).await; @@ -189,10 +196,6 @@ async fn run_in_container( }) } -// --------------------------------------------------------------------------- -// DecisionPlan -// --------------------------------------------------------------------------- - #[derive(Debug, Clone)] struct DecisionPlan { run_command: bool, @@ -220,10 +223,6 @@ fn decision_plan(decision: &CacheDecision) -> DecisionPlan { } } -// --------------------------------------------------------------------------- -// sanitize_container_name -// --------------------------------------------------------------------------- - fn sanitize_container_name(run_id: &str, step_key: &str) -> String { let run_short: String = run_id.chars().take(8).collect(); let key: String = step_key @@ -239,10 +238,6 @@ fn sanitize_container_name(run_id: &str, step_key: &str) -> String { format!("harmont-{run_short}-{key}") } -// --------------------------------------------------------------------------- -// StepLogWriter -// --------------------------------------------------------------------------- - /// Streams bytes from a Docker exec into per-line [`BuildEvent::StepLog`] /// events on the [`EventBus`]. Buffers partial lines until a `\n` arrives. struct StepLogWriter { @@ -311,10 +306,6 @@ impl tokio::io::AsyncWrite for StepLogWriter { } } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { diff --git a/crates/hm/src/runner/mod.rs b/crates/hm/src/runner/mod.rs index 4944800..37ea182 100644 --- a/crates/hm/src/runner/mod.rs +++ b/crates/hm/src/runner/mod.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::Result; use hm_plugin_protocol::{BuildEvent, ExecutorInput, StepResult}; @@ -18,14 +18,9 @@ use tokio_util::sync::CancellationToken; use crate::orchestrator::archive::ArchiveStore; use crate::orchestrator::docker_client::DockerClient; use crate::orchestrator::events::EventBus; -use crate::orchestrator::workspace::WorkspaceManager; pub mod docker; -// --------------------------------------------------------------------------- -// RunContext -// --------------------------------------------------------------------------- - /// Shared context threaded into every runner invocation. /// /// Replaces the monolithic `OrchestratorState` that the old plugin @@ -37,13 +32,8 @@ pub struct RunContext { pub event_bus: Arc, pub archives: Arc, pub cancel: CancellationToken, - pub workspace: Arc>, } -// --------------------------------------------------------------------------- -// StepRunner trait -// --------------------------------------------------------------------------- - /// Async trait implemented by step executors (e.g. the Docker runner). /// /// Each runner is identified by a string [`Self::name`] that pipeline @@ -70,10 +60,6 @@ pub trait StepRunner: Send + Sync + fmt::Debug { ) -> Pin> + Send + '_>>; } -// --------------------------------------------------------------------------- -// OutputRenderer trait -// --------------------------------------------------------------------------- - /// Synchronous observer of [`BuildEvent`]s. /// /// Implementations format events for human consumption (progress bars, @@ -83,10 +69,6 @@ pub trait OutputRenderer: Send + fmt::Debug { fn on_event(&mut self, event: &BuildEvent); } -// --------------------------------------------------------------------------- -// RunnerRegistry -// --------------------------------------------------------------------------- - /// Maps runner names to [`StepRunner`] implementations. /// /// Constructed once at startup and shared immutably for the duration @@ -150,10 +132,6 @@ impl fmt::Debug for RunnerRegistry { } } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { diff --git a/crates/hm/tests/cow_workspace.rs b/crates/hm/tests/cow_workspace.rs index bcbdcdf..e61da47 100644 --- a/crates/hm/tests/cow_workspace.rs +++ b/crates/hm/tests/cow_workspace.rs @@ -1,10 +1,10 @@ #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] -//! End-to-end test for COW workspace mode. +//! End-to-end test for workspace state propagation via Docker commits. //! -//! Verifies that `hm run --cow` correctly propagates workspace state -//! across a three-step chain: step `a` writes a file, step `b` reads -//! it and writes another, step `c` reads both — proving COW workspace -//! inheritance works through the entire chain. +//! Verifies that `hm run` correctly propagates workspace state across a +//! three-step chain: step `a` writes a file, step `b` reads it and +//! writes another, step `c` reads both — proving Docker-commit-based +//! workspace inheritance works through the entire chain. //! //! Skipped unless `HARMONT_LOCAL_E2E=1` is set AND a Docker daemon is //! reachable. @@ -55,7 +55,7 @@ fn cow_chain_inherits_workspace() { .expect("copy fixture into .harmont/"); let out = Command::new(bin) - .args(["run", "--cow", "--logs", "--dir"]) + .args(["run", "--logs", "--dir"]) .arg(tmp.path()) .arg("cow-chain") .env("HARMONT_CIDSL_PY", repo_root.join("cidsl/py")) @@ -66,10 +66,10 @@ fn cow_chain_inherits_workspace() { let stdout = String::from_utf8_lossy(&out.stdout); assert!( out.status.success(), - "hm run --cow failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" + "hm run failed.\nstdout:\n{stdout}\nstderr:\n{stderr}" ); assert!( stderr.contains("c-saw-both"), - "step c must see files from a and b via COW workspace.\nstdout:\n{stdout}\nstderr:\n{stderr}" + "step c must see files from a and b via Docker workspace.\nstdout:\n{stdout}\nstderr:\n{stderr}" ); }