From df3802bb5cfa8717d02f495d3ca4169e005746e4 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 09:55:08 +0300 Subject: [PATCH 1/9] refactor: move sha256_hex function to shared module and update cache.rs --- libs/api/src/stakpak/knowledge/cache.rs | 29 +--------------------- libs/shared/src/hash.rs | 33 +++++++++++++++++++++++++ libs/shared/src/lib.rs | 1 + 3 files changed, 35 insertions(+), 28 deletions(-) create mode 100644 libs/shared/src/hash.rs diff --git a/libs/api/src/stakpak/knowledge/cache.rs b/libs/api/src/stakpak/knowledge/cache.rs index 9a8204dd0..e2ca13162 100644 --- a/libs/api/src/stakpak/knowledge/cache.rs +++ b/libs/api/src/stakpak/knowledge/cache.rs @@ -6,7 +6,7 @@ //! //! All operations are best-effort. -use sha2::{Digest, Sha256}; +use stakpak_shared::hash::sha256_hex; use std::path::{Path, PathBuf}; use tokio::fs; use tokio::io::AsyncWriteExt; @@ -132,37 +132,10 @@ pub async fn evict_cached(path: &Path) { } } -/// Hex-encoded SHA-256 of `bytes`. -pub fn sha256_hex(bytes: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(bytes); - let digest = hasher.finalize(); - let mut out = String::with_capacity(digest.len() * 2); - for b in digest { - use std::fmt::Write; - let _ = write!(&mut out, "{:02x}", b); - } - out -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn sha256_matches_known_vector() { - // SHA-256 of empty string - assert_eq!( - sha256_hex(b""), - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" - ); - // SHA-256 of "abc" - assert_eq!( - sha256_hex(b"abc"), - "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" - ); - } - #[test] fn cached_path_rejects_traversal() { assert!(cached_path("acct", "../etc/passwd").is_none()); diff --git a/libs/shared/src/hash.rs b/libs/shared/src/hash.rs new file mode 100644 index 000000000..622909a22 --- /dev/null +++ b/libs/shared/src/hash.rs @@ -0,0 +1,33 @@ +use sha2::{Digest, Sha256}; + +/// Hex-encoded SHA-256 of `bytes`. +pub fn sha256_hex(bytes: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(bytes); + let digest = hasher.finalize(); + let mut out = String::with_capacity(digest.len() * 2); + for b in digest { + use std::fmt::Write; + let _ = write!(&mut out, "{:02x}", b); + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sha256_matches_known_vectors() { + // SHA-256 of empty string + assert_eq!( + sha256_hex(b""), + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ); + // SHA-256 of "abc" + assert_eq!( + sha256_hex(b"abc"), + "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" + ); + } +} diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 37e24cf06..813e9c2f2 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -3,6 +3,7 @@ pub mod cert_utils; pub mod container; pub mod file_backup_manager; pub mod file_watcher; +pub mod hash; pub mod helper; pub mod hooks; pub mod jwt; From c18deb2f031cbdd11580dd1d8b43e9de1a0dc25b Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 11:06:53 +0300 Subject: [PATCH 2/9] feat: add list_with_meta method to StorageBackend for file metadata retrieval --- Cargo.lock | 1 + libs/ak/Cargo.toml | 1 + libs/ak/src/lib.rs | 2 +- libs/ak/src/store.rs | 144 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 146 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9800ea43d..aecdbe613 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6201,6 +6201,7 @@ dependencies = [ "serde_json", "serde_yaml", "stakpak-api", + "stakpak-shared", "tempfile", "tokio", "walkdir", diff --git a/libs/ak/Cargo.toml b/libs/ak/Cargo.toml index 1ae6be936..d7b814530 100644 --- a/libs/ak/Cargo.toml +++ b/libs/ak/Cargo.toml @@ -18,6 +18,7 @@ globset = { workspace = true } grep-matcher = { workspace = true } grep-regex = { workspace = true } stakpak-api = { path = "../api" } +stakpak-shared = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/libs/ak/src/lib.rs b/libs/ak/src/lib.rs index 3cba6b1c9..47797e75e 100644 --- a/libs/ak/src/lib.rs +++ b/libs/ak/src/lib.rs @@ -6,4 +6,4 @@ pub mod store; pub use error::Error; pub use search::{GrepResult, PeekResult, SearchEngine, TreeNavEngine}; -pub use store::{Entry, LocalFsBackend, RemoteBackend, StorageBackend, TreeNode}; +pub use store::{Entry, FileMeta, LocalFsBackend, RemoteBackend, StorageBackend, TreeNode}; diff --git a/libs/ak/src/store.rs b/libs/ak/src/store.rs index e842b9f4b..7f0365975 100644 --- a/libs/ak/src/store.rs +++ b/libs/ak/src/store.rs @@ -3,6 +3,7 @@ use serde::Serialize; use stakpak_api::stakpak::{ KnowledgeApiError, ListKnowledgeFilesQuery, StakpakApiClient, StakpakApiConfig, }; +use stakpak_shared::hash::sha256_hex; use std::cmp::Ordering; use std::fs; use std::io::ErrorKind; @@ -35,6 +36,9 @@ pub trait StorageBackend { /// Returns sorted store-relative file paths under `prefix`, excluding dotfiles. /// Missing prefixes return an empty result. fn walk(&self, prefix: &str) -> Result, Error>; + /// Like [`walk`](Self::walk), but each entry also carries the file's + /// content hash (hex SHA-256) and size in bytes. + fn list_with_meta(&self, prefix: &str) -> Result, Error>; fn exists(&self, path: &str) -> Result; } @@ -44,6 +48,17 @@ pub struct Entry { pub is_dir: bool, } +/// Per-file metadata used for sync reconciliation. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct FileMeta { + /// Store-relative path, using `/` as separator. + pub path: String, + /// Hex-encoded SHA-256 of the file body. + pub content_hash: String, + /// File size in bytes. + pub size_bytes: u64, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct TreeNode { pub name: String, @@ -401,6 +416,25 @@ impl StorageBackend for LocalFsBackend { Ok(walked) } + fn list_with_meta(&self, prefix: &str) -> Result, Error> { + let paths = self.walk(prefix)?; + + let mut results = Vec::with_capacity(paths.len()); + for path in paths { + let absolute = self.resolve_path(&path)?; + let bytes = fs::read(&absolute)?; + let size_bytes = bytes.len() as u64; + let content_hash = sha256_hex(&bytes); + results.push(FileMeta { + path, + content_hash, + size_bytes, + }); + } + + Ok(results) + } + fn exists(&self, path: &str) -> Result { let target = self.resolve_path(path)?; self.ensure_no_symlinks_below_root(&target)?; @@ -657,6 +691,36 @@ impl StorageBackend for RemoteBackend { Ok(paths) } + fn list_with_meta(&self, prefix: &str) -> Result, Error> { + let query = ListKnowledgeFilesQuery { + path: if prefix.is_empty() { + None + } else { + Some(prefix.to_string()) + }, + glob: None, + }; + + let response = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(async { self.client.list_knowledge_files(&query).await }) + }) + .map_err(|e| map_knowledge_err(prefix, e))?; + + let mut results: Vec = response + .files + .into_iter() + .map(|file| FileMeta { + path: file.path, + content_hash: file.content_hash, + size_bytes: file.size_bytes.max(0) as u64, + }) + .collect(); + + results.sort_by(|a, b| a.path.cmp(&b.path)); + Ok(results) + } + fn exists(&self, path: &str) -> Result { // Use HEAD so we don't pay the cost of transferring the body. tokio::task::block_in_place(|| { @@ -731,7 +795,7 @@ impl RemoteBackend { #[cfg(test)] mod tests { - use super::{Entry, LocalFsBackend, StorageBackend, TreeNode}; + use super::{Entry, FileMeta, LocalFsBackend, StorageBackend, TreeNode}; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; @@ -1191,4 +1255,82 @@ mod tests { assert!(walked.is_empty()); } + + #[test] + fn list_with_meta_returns_hashes_and_sizes_for_local_files() { + use stakpak_shared::hash::sha256_hex; + + let (_temp_dir, backend) = backend(); + let alpha = b"alpha-content"; + let beta = b"beta longer content body"; + backend + .create("services/alpha.md", alpha) + .expect("create alpha"); + backend + .create("services/sub/beta.md", beta) + .expect("create beta"); + + let mut metas = backend.list_with_meta("services").expect("list_with_meta"); + + // The trait contract promises sorted-by-path output. + metas.sort_by(|a, b| a.path.cmp(&b.path)); + + assert_eq!( + metas, + vec![ + FileMeta { + path: "services/alpha.md".to_string(), + content_hash: sha256_hex(alpha), + size_bytes: alpha.len() as u64, + }, + FileMeta { + path: "services/sub/beta.md".to_string(), + content_hash: sha256_hex(beta), + size_bytes: beta.len() as u64, + }, + ] + ); + } + + #[test] + fn list_with_meta_empty_for_missing_prefix() { + let (_temp_dir, backend) = backend(); + backend + .create("notes/todo.md", b"todo") + .expect("create notes file"); + + let metas = backend + .list_with_meta("nonexistent") + .expect("list_with_meta on missing prefix"); + + assert!(metas.is_empty()); + } + + #[test] + fn list_with_meta_skips_dotfiles() { + let (_temp_dir, backend) = backend(); + backend + .create("notes/todo.md", b"todo") + .expect("create notes file"); + // Drop a dotfile directly on disk — `create` would refuse it via + // the same path rules, so we sidestep the API to set up the case. + let dotfile_dir = backend.root().join(".secrets"); + std::fs::create_dir_all(&dotfile_dir).expect("create dotfile dir"); + std::fs::write(dotfile_dir.join("api-key"), b"shh").expect("write dotfile"); + std::fs::write(backend.root().join(".env"), b"k=v").expect("write env dotfile"); + + let metas = backend.list_with_meta("").expect("list root"); + + assert_eq!(metas.len(), 1, "dotfiles must be excluded: {metas:?}"); + assert_eq!(metas[0].path, "notes/todo.md"); + } + + #[test] + fn list_with_meta_returns_empty_for_empty_store() { + let (_temp_dir, backend) = backend(); + + let metas = backend.list_with_meta("").expect("list empty store"); + + assert!(metas.is_empty()); + } } From ea5119ec856a76621722ec883dd0000000b05132 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 11:20:53 +0300 Subject: [PATCH 3/9] feat: Add sync command structure for reconciling local and remote knowledge stores --- cli/src/commands/ak/mod.rs | 21 ++++- cli/src/commands/ak/sync.rs | 179 ++++++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+), 3 deletions(-) create mode 100644 cli/src/commands/ak/sync.rs diff --git a/cli/src/commands/ak/mod.rs b/cli/src/commands/ak/mod.rs index afc5dfbd8..00dca013e 100644 --- a/cli/src/commands/ak/mod.rs +++ b/cli/src/commands/ak/mod.rs @@ -11,6 +11,8 @@ use std::rc::Rc; use crate::config::AppConfig; +mod sync; + pub const AK_LONG_ABOUT: &str = "LLM-oriented commands for reading and writing persistent knowledge. @@ -23,6 +25,7 @@ Key commands: - ak read ...: read one or more files in full - ak write : create a new file; use --force to overwrite intentionally - ak remove : remove a file or directory recursively +- ak sync push|pull: reconcile the local store with the remote Stakpak knowledge store - ak skill : print a built-in ak skill prompt Recommended discovery flow: @@ -41,7 +44,9 @@ pub const AK_AFTER_HELP: &str = "Examples: stakpak ak read services/rate-limits.md services/auth-flow.md echo 'Rate limit is 1000/min' | stakpak ak write services/rate-limits.md stakpak ak write notes.md --file /tmp/notes.md - stakpak ak remove services/rate-limits.md"; + stakpak ak remove services/rate-limits.md + stakpak ak sync push --dry-run + stakpak ak sync pull --strategy remote"; #[derive(Subcommand, PartialEq, Debug)] #[command( @@ -160,6 +165,12 @@ Use `usage` to teach an agent how to navigate and write to the store. Use `maint /// Built-in skill name: usage, maintain, or retrospect name: String, }, + + /// Reconcile the local ak knowledge store with the remote Stakpak knowledge store. + Sync { + #[command(subcommand)] + cmd: sync::SyncCommand, + }, } impl AkCommands { @@ -168,6 +179,10 @@ impl AkCommands { return run_skill(name); } + if let Self::Sync { cmd } = self { + return sync::run(cmd, config); + } + let backend = create_backend(&config)?; match self { Self::Search { @@ -180,8 +195,8 @@ impl AkCommands { Self::Read { paths } => run_read(backend.clone(), &paths)?, Self::Write { path, file, force } => run_write(backend.clone(), path, file, force)?, Self::Remove { path } => run_remove(backend.clone(), &path)?, - Self::Skill { .. } => { - unreachable!("Will never reach here because of the early return above") + Self::Skill { .. } | Self::Sync { .. } => { + unreachable!("Will never reach here because of the early returns above") } } diff --git a/cli/src/commands/ak/sync.rs b/cli/src/commands/ak/sync.rs new file mode 100644 index 000000000..6740d0f94 --- /dev/null +++ b/cli/src/commands/ak/sync.rs @@ -0,0 +1,179 @@ +//! `stakpak ak sync` — reconcile the local ak knowledge store with the +//! remote Stakpak knowledge store. + +use clap::{Subcommand, ValueEnum}; + +use crate::config::AppConfig; + +/// Conflict-resolution strategy applied when local and remote disagree on +/// the same path. Without `--strategy`, sync is fail-fast: it lists the +/// conflicts and exits non-zero so the user can pick a strategy. +/// +/// Effect depends on direction (push vs pull): +/// +/// | Strategy | `push` (local → remote) | `pull` (remote → local) | +/// |----------|------------------------------------|------------------------------------| +/// | Local | overwrite remote with local | keep local (skip download) | +/// | Remote | skip (don't push the local change) | overwrite local with remote | +/// | Skip | leave conflicts; sync the rest | leave conflicts; sync the rest | +/// | Recent | overwrite older with newer | overwrite older with newer | +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +#[value(rename_all = "lower")] +pub enum Strategy { + /// Treat the local copy as authoritative for any conflict. + Local, + /// Treat the remote copy as authoritative for any conflict. + Remote, + /// Treat the newer copy as authoritative for any conflict. + Recent, + /// Skip conflicting files; sync everything else. + Skip, +} + +/// Direction of a sync operation. Set by the `push`/`pull` subcommand; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncDirection { + /// Local → remote. Uploads local-only files; never deletes on the remote. + Push, + /// Remote → local. Downloads remote-only files; never deletes locally. + Pull, +} + +#[derive(Subcommand, PartialEq, Debug)] +#[command( + about = "Sync the local ak knowledge store with the remote Stakpak knowledge store", + long_about = "Reconcile the local ak knowledge store (~/.stakpak/knowledge or $AK_STORE) \ +with the remote Stakpak knowledge store. + +Sync is directional and additive: +- `push` copies local-only files up to the remote +- `pull` copies remote-only files down to the local store +- neither subcommand deletes files on either side + +When the same path exists on both sides with different content, sync is \ +fail-fast by default: it lists the conflicts and exits with code 2 so you \ +can rerun with an explicit `--strategy`.", + after_help = "Examples: + stakpak ak sync push + stakpak ak sync pull --dry-run + stakpak ak sync push --strategy local + stakpak ak sync pull --strategy remote" +)] +pub enum SyncCommand { + #[command( + about = "Push local-only files up to the remote knowledge store", + long_about = "Upload files that exist locally but are missing on the remote. + +Files identical on both sides are skipped (compared by SHA-256 content hash). \ +Files that exist remotely but not locally are left alone \ +When the same path exists on both sides with different content, sync stops \ +and lists the conflicts. Rerun with `--strategy` to resolve." + )] + Push { + /// Conflict-resolution strategy. Without this flag, sync is fail-fast on conflicts. + #[arg(long, value_enum)] + strategy: Option, + + /// Print the plan (uploads, skips, conflicts) without modifying anything. + #[arg(long, default_value_t = false)] + dry_run: bool, + }, + + #[command( + about = "Pull remote-only files down to the local knowledge store", + long_about = "Download files that exist remotely but are missing locally. + +Files identical on both sides are skipped (compared by SHA-256 content hash). \ +Files that exist locally but not remotely are left alone \ +When the same path exists on both sides with different content, sync stops \ +and lists the conflicts. Rerun with `--strategy` to resolve." + )] + Pull { + /// Conflict-resolution strategy. Without this flag, sync is fail-fast on conflicts. + #[arg(long, value_enum)] + strategy: Option, + + /// Print the plan (downloads, skips, conflicts) without modifying anything. + #[arg(long, default_value_t = false)] + dry_run: bool, + }, +} + +impl SyncCommand { + /// Direction of this subcommand (push vs pull). + pub fn direction(&self) -> SyncDirection { + match self { + Self::Push { .. } => SyncDirection::Push, + Self::Pull { .. } => SyncDirection::Pull, + } + } + + /// Whether `--dry-run` was passed. + pub fn dry_run(&self) -> bool { + match self { + Self::Push { dry_run, .. } | Self::Pull { dry_run, .. } => *dry_run, + } + } + + /// User-supplied conflict-resolution strategy, if any. `None` means fail-fast. + pub fn strategy(&self) -> Option { + match self { + Self::Push { strategy, .. } | Self::Pull { strategy, .. } => *strategy, + } + } +} + +pub fn run(cmd: SyncCommand, _config: AppConfig) -> Result<(), String> { + let _ = (cmd.direction(), cmd.dry_run(), cmd.strategy()); + todo!(); +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::CommandFactory; + use clap::Parser; + + #[derive(Parser, Debug)] + struct TestCli { + #[command(subcommand)] + cmd: SyncCommand, + } + + #[test] + fn clap_definition_is_valid() { + // Catches misuse of clap attributes at compile + load time. + TestCli::command().debug_assert(); + } + + #[test] + fn push_defaults() { + let cli = TestCli::try_parse_from(["test", "push"]).expect("parse"); + assert_eq!(cli.cmd.direction(), SyncDirection::Push); + assert!(!cli.cmd.dry_run()); + assert_eq!(cli.cmd.strategy(), None); + } + + #[test] + fn pull_with_dry_run_and_strategy() { + let cli = TestCli::try_parse_from(["test", "pull", "--dry-run", "--strategy", "remote"]) + .expect("parse"); + assert_eq!(cli.cmd.direction(), SyncDirection::Pull); + assert!(cli.cmd.dry_run()); + assert_eq!(cli.cmd.strategy(), Some(Strategy::Remote)); + } + + #[test] + fn strategy_accepts_all_three_values() { + for value in ["local", "remote", "skip"] { + TestCli::try_parse_from(["test", "push", "--strategy", value]) + .unwrap_or_else(|e| panic!("strategy={value} should parse, got: {e}")); + } + } + + #[test] + fn unknown_strategy_rejected() { + let result = TestCli::try_parse_from(["test", "push", "--strategy", "force"]); + assert!(result.is_err(), "unknown strategy should fail to parse"); + } +} From 828689ed2b3b6040702818f1b07f54fffa8b1dd3 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 11:35:26 +0300 Subject: [PATCH 4/9] feat(ak sync): Implement reconciliation planning for sync --- cli/src/commands/ak/sync.rs | 303 ++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/cli/src/commands/ak/sync.rs b/cli/src/commands/ak/sync.rs index 6740d0f94..afb948c1d 100644 --- a/cli/src/commands/ak/sync.rs +++ b/cli/src/commands/ak/sync.rs @@ -2,6 +2,8 @@ //! remote Stakpak knowledge store. use clap::{Subcommand, ValueEnum}; +use stakpak_ak::{FileMeta, StorageBackend}; +use std::collections::HashMap; use crate::config::AppConfig; @@ -128,11 +130,124 @@ pub fn run(cmd: SyncCommand, _config: AppConfig) -> Result<(), String> { todo!(); } +// ============================================================================ +// Plan +// ============================================================================ + +/// A single conflict: same path on both sides, different content. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Conflict { + pub path: String, + pub local_hash: String, + pub remote_hash: String, + pub local_size: u64, + pub remote_size: u64, +} + +/// The full reconciliation plan produced by [`plan`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncPlan { + pub direction: SyncDirection, + pub uploads: Vec, + pub downloads: Vec, + pub skipped: Vec, + pub conflicts: Vec, +} + +impl SyncPlan { + pub fn is_empty(&self) -> bool { + self.uploads.is_empty() && self.downloads.is_empty() && self.conflicts.is_empty() + } +} + +/// Build a [`SyncPlan`] by enumerating both sides and classifying each path. +/// +/// Both backends are walked from the store root (`""`) and indexed by +/// path. The classification rules follow the following rules +/// +/// | Local | Remote | Hashes | Push | Pull | +/// |-------|--------|--------|---------------|----------------| +/// | yes | no | — | upload | (silently skip)| +/// | no | yes | — | (silently skip)| download | +/// | yes | yes | match | skip | skip | +/// | yes | yes | differ | conflict | conflict | +/// +/// Output vectors are sorted by path for deterministic, diff-friendly output. +pub fn plan( + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + direction: SyncDirection, +) -> Result { + let local_metas = local + .list_with_meta("") + .map_err(|e| format!("failed to enumerate local store: {e}"))?; + let remote_metas = remote + .list_with_meta("") + .map_err(|e| format!("failed to enumerate remote store: {e}"))?; + + let mut local_index: HashMap = local_metas + .into_iter() + .map(|meta| (meta.path.clone(), meta)) + .collect(); + let remote_index: HashMap = remote_metas + .into_iter() + .map(|meta| (meta.path.clone(), meta)) + .collect(); + + let mut uploads: Vec = Vec::new(); + let mut downloads: Vec = Vec::new(); + let mut skipped: Vec = Vec::new(); + let mut conflicts: Vec = Vec::new(); + + for (path, remote_meta) in &remote_index { + match local_index.remove(path) { + Some(local_meta) => { + if local_meta.content_hash == remote_meta.content_hash { + skipped.push(path.clone()); + } else { + conflicts.push(Conflict { + path: path.clone(), + local_hash: local_meta.content_hash, + remote_hash: remote_meta.content_hash.clone(), + local_size: local_meta.size_bytes, + remote_size: remote_meta.size_bytes, + }); + } + } + None => { + if matches!(direction, SyncDirection::Pull) { + downloads.push(remote_meta.clone()); + } + } + } + } + + for (_path, local_meta) in local_index.drain() { + if matches!(direction, SyncDirection::Push) { + uploads.push(local_meta); + } + } + + uploads.sort_by(|a, b| a.path.cmp(&b.path)); + downloads.sort_by(|a, b| a.path.cmp(&b.path)); + skipped.sort(); + conflicts.sort_by(|a, b| a.path.cmp(&b.path)); + + Ok(SyncPlan { + direction, + uploads, + downloads, + skipped, + conflicts, + }) +} + #[cfg(test)] mod tests { use super::*; use clap::CommandFactory; use clap::Parser; + use stakpak_ak::{LocalFsBackend, StorageBackend}; #[derive(Parser, Debug)] struct TestCli { @@ -176,4 +291,192 @@ mod tests { let result = TestCli::try_parse_from(["test", "push", "--strategy", "force"]); assert!(result.is_err(), "unknown strategy should fail to parse"); } + + /// Build a (local, remote) backend pair backed by tempdirs. + /// The tempdirs are returned so callers can keep them alive for the + /// duration of the test. + fn pair() -> (tempfile::TempDir, LocalFsBackend, LocalFsBackend) { + let temp = tempfile::TempDir::new().expect("temp dir"); + let local = LocalFsBackend::with_root(temp.path().join("local")); + let remote = LocalFsBackend::with_root(temp.path().join("remote")); + (temp, local, remote) + } + + #[test] + fn plan_empty_stores() { + let (_temp, local, remote) = pair(); + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert!(p.uploads.is_empty()); + assert!(p.downloads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(p.conflicts.is_empty()); + assert!(p.is_empty()); + } + + #[test] + fn plan_push_uploads_local_only_files() { + let (_temp, local, remote) = pair(); + local.create("notes/a.md", b"alpha").expect("create local"); + local + .create("services/b.md", b"beta") + .expect("create local"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert_eq!(p.uploads.len(), 2); + assert_eq!(p.uploads[0].path, "notes/a.md"); + assert_eq!(p.uploads[1].path, "services/b.md"); + assert!(p.downloads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(p.conflicts.is_empty()); + } + + #[test] + fn plan_pull_downloads_remote_only_files() { + let (_temp, local, remote) = pair(); + remote + .create("notes/a.md", b"alpha") + .expect("create remote"); + remote + .create("services/b.md", b"beta") + .expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Pull).expect("plan"); + + assert_eq!(p.downloads.len(), 2); + assert_eq!(p.downloads[0].path, "notes/a.md"); + assert_eq!(p.downloads[1].path, "services/b.md"); + assert!(p.uploads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(p.conflicts.is_empty()); + } + + #[test] + fn plan_push_ignores_remote_only_files() { + // Sync is additive: a remote-only file should NOT appear in any + // bucket on push (no delete-on-remote, no skipped entry either). + let (_temp, local, remote) = pair(); + remote + .create("only-remote.md", b"x") + .expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert!(p.uploads.is_empty()); + assert!(p.downloads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(p.conflicts.is_empty()); + } + + #[test] + fn plan_pull_ignores_local_only_files() { + let (_temp, local, remote) = pair(); + local.create("only-local.md", b"x").expect("create local"); + + let p = plan(&local, &remote, SyncDirection::Pull).expect("plan"); + + assert!(p.uploads.is_empty()); + assert!(p.downloads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(p.conflicts.is_empty()); + } + + #[test] + fn plan_skips_identical_files() { + let (_temp, local, remote) = pair(); + local.create("shared.md", b"same").expect("create local"); + remote.create("shared.md", b"same").expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert_eq!(p.skipped, vec!["shared.md".to_string()]); + assert!(p.uploads.is_empty()); + assert!(p.conflicts.is_empty()); + assert!(p.is_empty()); + } + + #[test] + fn plan_detects_conflicts() { + let (_temp, local, remote) = pair(); + local + .create("shared.md", b"local-version") + .expect("create local"); + remote + .create("shared.md", b"remote-version") + .expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert_eq!(p.conflicts.len(), 1); + let c = &p.conflicts[0]; + assert_eq!(c.path, "shared.md"); + assert_ne!(c.local_hash, c.remote_hash); + assert_eq!(c.local_size, b"local-version".len() as u64); + assert_eq!(c.remote_size, b"remote-version".len() as u64); + assert!(p.uploads.is_empty()); + assert!(p.skipped.is_empty()); + assert!(!p.is_empty(), "conflicts make the plan non-noop"); + } + + #[test] + fn plan_combines_all_categories() { + // Mixed scenario: + // only-local.md -> upload (push) + // only-remote.md -> dropped (additive) + // identical.md -> skipped + // conflict.md -> conflict + let (_temp, local, remote) = pair(); + local + .create("only-local.md", b"L") + .expect("create only-local"); + remote + .create("only-remote.md", b"R") + .expect("create only-remote"); + local + .create("identical.md", b"same") + .expect("create identical local"); + remote + .create("identical.md", b"same") + .expect("create identical remote"); + local + .create("conflict.md", b"local-side") + .expect("create conflict local"); + remote + .create("conflict.md", b"remote-side") + .expect("create conflict remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + assert_eq!( + p.uploads + .iter() + .map(|m| m.path.as_str()) + .collect::>(), + vec!["only-local.md"] + ); + assert_eq!(p.skipped, vec!["identical.md".to_string()]); + assert_eq!(p.conflicts.len(), 1); + assert_eq!(p.conflicts[0].path, "conflict.md"); + assert!(p.downloads.is_empty()); + } + + #[test] + fn plan_outputs_are_sorted() { + // Deterministic ordering matters for cache-friendly diff output + // and for tests downstream. + let (_temp, local, remote) = pair(); + // Create in a deliberately non-alphabetical order. + for path in ["zebra.md", "alpha.md", "mango.md"] { + local.create(path, b"x").expect("create local"); + } + for path in ["yak.md", "apple.md"] { + remote.create(path, b"x").expect("create remote"); + } + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + + let upload_paths: Vec<&str> = p.uploads.iter().map(|m| m.path.as_str()).collect(); + assert_eq!(upload_paths, vec!["alpha.md", "mango.md", "zebra.md"]); + } } From efece18b1e0a7452c28663acf7e4032dbfe76295 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 11:59:33 +0300 Subject: [PATCH 5/9] feat(ak sync): Implement execute function for sync operations with conflict resolution --- cli/src/commands/ak/sync.rs | 459 ++++++++++++++++++++++++++++++++++++ 1 file changed, 459 insertions(+) diff --git a/cli/src/commands/ak/sync.rs b/cli/src/commands/ak/sync.rs index afb948c1d..4744a140e 100644 --- a/cli/src/commands/ak/sync.rs +++ b/cli/src/commands/ak/sync.rs @@ -242,6 +242,252 @@ pub fn plan( }) } +// ============================================================================ +// Execute +// ============================================================================ + +/// Per-file failure encountered during [`execute`]. +/// +/// Sync continues past failures (per the design doc's "network blip during +/// execute" edge case) and surfaces the full list in [`SyncReport::failures`] +/// so the caller can decide on the exit code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Failure { + pub path: String, + pub error: String, +} + +/// Outcome of [`execute`]. All vectors are sorted by path. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncReport { + pub direction: SyncDirection, + /// Paths successfully pushed (push only). + pub uploaded: Vec, + /// Paths successfully pulled (pull only). + pub downloaded: Vec, + /// Paths whose hashes already matched, plus any conflict skipped via + /// `--strategy` ([`Strategy::Skip`] or the no-op side of the chosen + /// strategy for that direction). + pub skipped: Vec, + /// Conflict paths where the chosen `--strategy` performed an + /// overwrite (e.g. `--strategy local` on a push pushed our local copy + /// over the remote's). + pub conflict_resolved: Vec, + /// Per-file errors. Sync did not abort; the file was simply skipped. + pub failures: Vec, +} + +impl SyncReport { + fn new(direction: SyncDirection) -> Self { + Self { + direction, + uploaded: Vec::new(), + downloaded: Vec::new(), + skipped: Vec::new(), + conflict_resolved: Vec::new(), + failures: Vec::new(), + } + } + + /// Whether any file failed to sync. The caller maps this to exit code 1. + pub fn has_failures(&self) -> bool { + !self.failures.is_empty() + } +} + +/// Apply a [`SyncPlan`] against the two backends. +/// +/// Execution is sequential and per-file fault-tolerant: a failure on one +/// path is recorded in [`SyncReport::failures`] but does not stop sync of +/// the remaining files +/// +/// `strategy` is required if the plan contains conflicts. The CLI layer +/// (`run()`) is responsible for fail-fast handling — calling `execute()` +/// with conflicts but no strategy is a programming error and returns +/// `Err`. +/// +/// +/// | Strategy | Push (local → remote) | Pull (remote → local) | +/// |----------|------------------------------------|------------------------------------| +/// | Local | overwrite remote with local | keep local (skip download) | +/// | Remote | skip (don't push the local change) | overwrite local with remote | +/// | Skip | skip the conflict; sync the rest | skip the conflict; sync the rest | +pub fn execute( + plan: SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, +) -> Result { + if !plan.conflicts.is_empty() && strategy.is_none() { + return Err(format!( + "execute() called with {} conflict(s) but no strategy", + plan.conflicts.len() + )); + } + + if matches!(strategy, Some(Strategy::Recent)) { + return Err( + "--strategy recent is not yet supported. use local, remote, or skip".to_string(), + ); + } + + let mut report = SyncReport::new(plan.direction); + // Skipped paths from the plan (hash already matched) carry through + // to the report so the user sees the full picture. + report.skipped.extend(plan.skipped.iter().cloned()); + + match plan.direction { + SyncDirection::Push => execute_push(&plan, local, remote, strategy, &mut report), + SyncDirection::Pull => execute_pull(&plan, local, remote, strategy, &mut report), + } + + // Final ordering for deterministic output. + report.uploaded.sort(); + report.downloaded.sort(); + report.skipped.sort(); + report.conflict_resolved.sort(); + report.failures.sort_by(|a, b| a.path.cmp(&b.path)); + + Ok(report) +} + +fn execute_push( + plan: &SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, + report: &mut SyncReport, +) { + // 1. Plain uploads (local-only files). + for meta in &plan.uploads { + match local.read(&meta.path) { + Ok(body) => match remote.create(&meta.path, &body) { + Ok(()) => report.uploaded.push(meta.path.clone()), + Err(stakpak_ak::Error::AlreadyExists(_)) => { + // 409 race: file appeared on the remote between plan + // and execute. Don't silently overwrite + report.failures.push(Failure { + path: meta.path.clone(), + error: "remote file appeared between plan and execute (race). \ + rerun `ak sync push` to re-evaluate" + .to_string(), + }); + } + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("upload failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("local read failed: {e}"), + }), + } + } + + // 2. Conflicts (apply strategy). + for conflict in &plan.conflicts { + match strategy { + Some(Strategy::Local) => { + // Overwrite remote with local. + match local.read(&conflict.path) { + Ok(body) => match remote.overwrite(&conflict.path, &body) { + Ok(()) => report.conflict_resolved.push(conflict.path.clone()), + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("conflict overwrite (remote) failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("local read for conflict failed: {e}"), + }), + } + } + Some(Strategy::Remote) | Some(Strategy::Skip) => { + report.skipped.push(conflict.path.clone()); + } + Some(Strategy::Recent) | None => { + // Already validated in execute(); unreachable here. + report.failures.push(Failure { + path: conflict.path.clone(), + error: "internal: conflict reached execute_push without resolvable strategy" + .to_string(), + }); + } + } + } +} + +fn execute_pull( + plan: &SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, + report: &mut SyncReport, +) { + // 1. Plain downloads (remote-only files). + for meta in &plan.downloads { + match remote.read(&meta.path) { + Ok(body) => match local.create(&meta.path, &body) { + Ok(()) => report.downloaded.push(meta.path.clone()), + Err(stakpak_ak::Error::AlreadyExists(_)) => { + // Local race: file appeared locally between plan and + // execute (e.g. concurrent `ak write`). Same handling + // as the push race. + report.failures.push(Failure { + path: meta.path.clone(), + error: "local file appeared between plan and execute (race). \ + rerun `ak sync pull` to re-evaluate" + .to_string(), + }); + } + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("local write failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("remote read failed: {e}"), + }), + } + } + + // 2. Conflicts (apply strategy). + for conflict in &plan.conflicts { + match strategy { + Some(Strategy::Remote) => { + // Overwrite local with remote. + match remote.read(&conflict.path) { + Ok(body) => match local.overwrite(&conflict.path, &body) { + Ok(()) => report.conflict_resolved.push(conflict.path.clone()), + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("conflict overwrite (local) failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("remote read for conflict failed: {e}"), + }), + } + } + Some(Strategy::Local) | Some(Strategy::Skip) => { + // Pull: don't touch the local copy. + report.skipped.push(conflict.path.clone()); + } + Some(Strategy::Recent) | None => { + report.failures.push(Failure { + path: conflict.path.clone(), + error: "internal: conflict reached execute_pull without resolvable strategy" + .to_string(), + }); + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -479,4 +725,217 @@ mod tests { let upload_paths: Vec<&str> = p.uploads.iter().map(|m| m.path.as_str()).collect(); assert_eq!(upload_paths, vec!["alpha.md", "mango.md", "zebra.md"]); } + + // ------------------------------------------------------------------ + // execute() tests + // ------------------------------------------------------------------ + + /// Convenience: build a plan and immediately execute it. The two + /// halves are tested independently above; here we just want the + /// end-to-end behavior. + fn plan_and_execute( + local: &LocalFsBackend, + remote: &LocalFsBackend, + direction: SyncDirection, + strategy: Option, + ) -> SyncReport { + let p = plan(local, remote, direction).expect("plan"); + execute(p, local, remote, strategy).expect("execute") + } + + #[test] + fn execute_empty_plan_is_noop() { + let (_temp, local, remote) = pair(); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, None); + + assert!(report.uploaded.is_empty()); + assert!(report.downloaded.is_empty()); + assert!(report.skipped.is_empty()); + assert!(report.conflict_resolved.is_empty()); + assert!(!report.has_failures()); + } + + #[test] + fn execute_push_uploads_files() { + let (_temp, local, remote) = pair(); + local.create("alpha.md", b"A").expect("create alpha"); + local.create("nested/beta.md", b"B").expect("create beta"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, None); + + assert_eq!(report.uploaded, vec!["alpha.md", "nested/beta.md"]); + // Bodies actually landed on the "remote". + assert_eq!(remote.read("alpha.md").expect("read alpha"), b"A"); + assert_eq!(remote.read("nested/beta.md").expect("read beta"), b"B"); + assert!(!report.has_failures()); + } + + #[test] + fn execute_pull_downloads_files() { + let (_temp, local, remote) = pair(); + remote.create("alpha.md", b"A").expect("create alpha"); + remote.create("nested/beta.md", b"B").expect("create beta"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Pull, None); + + assert_eq!(report.downloaded, vec!["alpha.md", "nested/beta.md"]); + assert_eq!(local.read("alpha.md").expect("read alpha"), b"A"); + assert_eq!(local.read("nested/beta.md").expect("read beta"), b"B"); + assert!(!report.has_failures()); + } + + #[test] + fn execute_carries_plan_skipped_into_report() { + // Files identical on both sides should appear in the report's + // `skipped` list so the user gets a complete picture. + let (_temp, local, remote) = pair(); + local.create("same.md", b"x").expect("create local"); + remote.create("same.md", b"x").expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, None); + + assert_eq!(report.skipped, vec!["same.md"]); + assert!(report.uploaded.is_empty()); + } + + #[test] + fn execute_push_strategy_local_overwrites_remote_on_conflict() { + let (_temp, local, remote) = pair(); + local.create("c.md", b"local-wins").expect("create local"); + remote.create("c.md", b"old-remote").expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, Some(Strategy::Local)); + + assert_eq!(report.conflict_resolved, vec!["c.md"]); + assert_eq!(remote.read("c.md").expect("read remote"), b"local-wins"); + assert!(report.skipped.is_empty()); + assert!(!report.has_failures()); + } + + #[test] + fn execute_push_strategy_remote_skips_conflict() { + let (_temp, local, remote) = pair(); + local + .create("c.md", b"local-version") + .expect("create local"); + remote + .create("c.md", b"remote-version") + .expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, Some(Strategy::Remote)); + + assert_eq!(report.skipped, vec!["c.md"]); + assert!(report.conflict_resolved.is_empty()); + // Remote untouched. + assert_eq!(remote.read("c.md").expect("read remote"), b"remote-version"); + } + + #[test] + fn execute_push_strategy_skip_skips_conflict() { + let (_temp, local, remote) = pair(); + local.create("c.md", b"L").expect("create local"); + remote.create("c.md", b"R").expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Push, Some(Strategy::Skip)); + + assert_eq!(report.skipped, vec!["c.md"]); + assert!(report.conflict_resolved.is_empty()); + } + + #[test] + fn execute_pull_strategy_remote_overwrites_local_on_conflict() { + let (_temp, local, remote) = pair(); + local.create("c.md", b"old-local").expect("create local"); + remote + .create("c.md", b"remote-wins") + .expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Pull, Some(Strategy::Remote)); + + assert_eq!(report.conflict_resolved, vec!["c.md"]); + assert_eq!(local.read("c.md").expect("read local"), b"remote-wins"); + } + + #[test] + fn execute_pull_strategy_local_keeps_local_on_conflict() { + let (_temp, local, remote) = pair(); + local.create("c.md", b"local-keeps").expect("create local"); + remote + .create("c.md", b"remote-version") + .expect("create remote"); + + let report = plan_and_execute(&local, &remote, SyncDirection::Pull, Some(Strategy::Local)); + + assert_eq!(report.skipped, vec!["c.md"]); + assert!(report.conflict_resolved.is_empty()); + assert_eq!(local.read("c.md").expect("read local"), b"local-keeps"); + } + + #[test] + fn execute_conflicts_without_strategy_errors() { + // Defensive check: the CLI is supposed to fail-fast on conflicts + // without a strategy. If a programmer reaches execute() with + // conflicts and no strategy, they get a clear error rather than + // silent corruption. + let (_temp, local, remote) = pair(); + local.create("c.md", b"L").expect("create local"); + remote.create("c.md", b"R").expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + assert_eq!(p.conflicts.len(), 1); + + let err = execute(p, &local, &remote, None).expect_err("must error"); + assert!(err.contains("conflict"), "got: {err}"); + } + + #[test] + fn execute_strategy_recent_not_yet_supported() { + // `Recent` is parseable but rejected at execute time until + // FileMeta exposes timestamps. This test pins that contract. + let (_temp, local, remote) = pair(); + // Need conflicts in the plan so strategy is even consulted. + local.create("c.md", b"L").expect("create local"); + remote.create("c.md", b"R").expect("create remote"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let err = + execute(p, &local, &remote, Some(Strategy::Recent)).expect_err("recent unsupported"); + assert!(err.contains("recent"), "got: {err}"); + } + + #[test] + fn execute_failures_dont_abort_subsequent_files() { + // Pre-create one of the upload targets on the "remote" side AFTER + // planning. plan() sees the file as local-only (race-free in the + // plan), then execute() hits a 409 race on `create`. Sync must + // continue with the other file rather than abort. + let (_temp, local, remote) = pair(); + local.create("good.md", b"g").expect("create good"); + local.create("racy.md", b"r-local").expect("create racy"); + + let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + assert_eq!(p.uploads.len(), 2); + + // Inject the race: a different process created the file remotely + // between plan and execute. + remote + .create("racy.md", b"r-remote") + .expect("simulate race on remote"); + + let report = execute(p, &local, &remote, None).expect("execute"); + + // The good file made it through. + assert_eq!(report.uploaded, vec!["good.md"]); + // The racy file is recorded as a failure, not silently dropped. + assert_eq!(report.failures.len(), 1); + assert_eq!(report.failures[0].path, "racy.md"); + assert!( + report.failures[0].error.contains("race"), + "expected race-aware message, got: {}", + report.failures[0].error + ); + // Remote still has the unrelated body — we did NOT overwrite it. + assert_eq!(remote.read("racy.md").expect("read"), b"r-remote"); + } } From 406e75cbb070f551a9075f36b8869c0f139219fb Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 12:51:19 +0300 Subject: [PATCH 6/9] refactor(ak sync) restructure the ak sync into different modules --- cli/src/commands/ak/sync/display.rs | 113 +++++ cli/src/commands/ak/sync/execute.rs | 246 ++++++++++ cli/src/commands/ak/{sync.rs => sync/mod.rs} | 450 ++++--------------- cli/src/commands/ak/sync/plan.rs | 113 +++++ libs/shared/src/format.rs | 58 +++ libs/shared/src/lib.rs | 1 + 6 files changed, 614 insertions(+), 367 deletions(-) create mode 100644 cli/src/commands/ak/sync/display.rs create mode 100644 cli/src/commands/ak/sync/execute.rs rename cli/src/commands/ak/{sync.rs => sync/mod.rs} (57%) create mode 100644 cli/src/commands/ak/sync/plan.rs create mode 100644 libs/shared/src/format.rs diff --git a/cli/src/commands/ak/sync/display.rs b/cli/src/commands/ak/sync/display.rs new file mode 100644 index 000000000..bd74dc3d3 --- /dev/null +++ b/cli/src/commands/ak/sync/display.rs @@ -0,0 +1,113 @@ +use stakpak_shared::format::{format_size, short_hash}; + +use super::SyncDirection; +use super::execute::SyncReport; +use super::plan::{Conflict, SyncPlan}; + +pub fn print_plan(p: &SyncPlan) { + let direction_label = match p.direction { + SyncDirection::Push => "push", + SyncDirection::Pull => "pull", + }; + println!( + "ak sync {direction_label} (dry-run): {} upload(s), {} download(s), {} skipped, {} conflict(s)", + p.uploads.len(), + p.downloads.len(), + p.skipped.len(), + p.conflicts.len() + ); + + if !p.uploads.is_empty() { + println!(); + println!("uploads:"); + for meta in &p.uploads { + println!(" + {} ({})", meta.path, format_size(meta.size_bytes)); + } + } + if !p.downloads.is_empty() { + println!(); + println!("downloads:"); + for meta in &p.downloads { + println!(" + {} ({})", meta.path, format_size(meta.size_bytes)); + } + } + if !p.skipped.is_empty() { + println!(); + println!("unchanged ({}):", p.skipped.len()); + for path in &p.skipped { + println!(" = {path}"); + } + } + if !p.conflicts.is_empty() { + println!(); + println!("conflicts:"); + for c in &p.conflicts { + println!( + " ! {} local: {} {} remote: {} {}", + c.path, + short_hash(&c.local_hash), + format_size(c.local_size), + short_hash(&c.remote_hash), + format_size(c.remote_size), + ); + } + } +} + +pub fn print_conflicts_to_stderr(conflicts: &[Conflict]) { + for c in conflicts { + eprintln!( + " ! {} local: {} {} remote: {} {}", + c.path, + short_hash(&c.local_hash), + format_size(c.local_size), + short_hash(&c.remote_hash), + format_size(c.remote_size), + ); + } +} + +pub fn print_report(r: &SyncReport) { + let direction_label = match r.direction { + SyncDirection::Push => "push", + SyncDirection::Pull => "pull", + }; + + let total_changed = r.uploaded.len() + r.downloaded.len() + r.conflict_resolved.len(); + println!( + "ak sync {direction_label}: {total_changed} change(s), {} skipped, {} failure(s)", + r.skipped.len(), + r.failures.len(), + ); + + if !r.uploaded.is_empty() { + println!(); + println!("uploaded:"); + for path in &r.uploaded { + println!(" + {path}"); + } + } + if !r.downloaded.is_empty() { + println!(); + println!("downloaded:"); + for path in &r.downloaded { + println!(" + {path}"); + } + } + if !r.conflict_resolved.is_empty() { + println!(); + println!("conflicts resolved:"); + for path in &r.conflict_resolved { + println!(" ~ {path}"); + } + } + if !r.failures.is_empty() { + // Use stderr for failures so a successful pipe (e.g. `... | tee + // sync.log`) still surfaces them prominently. + eprintln!(); + eprintln!("failures:"); + for f in &r.failures { + eprintln!(" ✗ {}: {}", f.path, f.error); + } + } +} diff --git a/cli/src/commands/ak/sync/execute.rs b/cli/src/commands/ak/sync/execute.rs new file mode 100644 index 000000000..6242a0401 --- /dev/null +++ b/cli/src/commands/ak/sync/execute.rs @@ -0,0 +1,246 @@ +use stakpak_ak::StorageBackend; + +use super::plan::SyncPlan; +use super::{Strategy, SyncDirection}; + +/// Per-file failure encountered during [`execute`]. +/// +/// Sync continues past failures (per the design doc's "network blip during +/// execute" edge case) and surfaces the full list in [`SyncReport::failures`] +/// so the caller can decide on the exit code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Failure { + pub path: String, + pub error: String, +} + +/// Outcome of [`execute`]. All vectors are sorted by path. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncReport { + pub direction: SyncDirection, + /// Paths successfully pushed (push only). + pub uploaded: Vec, + /// Paths successfully pulled (pull only). + pub downloaded: Vec, + /// Paths whose hashes already matched, plus any conflict skipped via + /// `--strategy` ([`Strategy::Skip`] or the no-op side of the chosen + /// strategy for that direction). + pub skipped: Vec, + /// Conflict paths where the chosen `--strategy` performed an + /// overwrite (e.g. `--strategy local` on a push pushed our local copy + /// over the remote's). + pub conflict_resolved: Vec, + /// Per-file errors. Sync did not abort; the file was simply skipped. + pub failures: Vec, +} + +impl SyncReport { + fn new(direction: SyncDirection) -> Self { + Self { + direction, + uploaded: Vec::new(), + downloaded: Vec::new(), + skipped: Vec::new(), + conflict_resolved: Vec::new(), + failures: Vec::new(), + } + } + + /// Whether any file failed to sync. The caller maps this to exit code 1. + pub fn has_failures(&self) -> bool { + !self.failures.is_empty() + } +} + +/// Apply a [`SyncPlan`] against the two backends. +/// +/// Execution is sequential and per-file fault-tolerant: a failure on one +/// path is recorded in [`SyncReport::failures`] but does not stop sync of +/// the remaining files +/// +/// `strategy` is required if the plan contains conflicts. The CLI layer +/// (`run()`) is responsible for fail-fast handling — calling `execute()` +/// with conflicts but no strategy is a programming error and returns +/// `Err`. +/// +/// +/// | Strategy | Push (local → remote) | Pull (remote → local) | +/// |----------|------------------------------------|------------------------------------| +/// | Local | overwrite remote with local | keep local (skip download) | +/// | Remote | skip (don't push the local change) | overwrite local with remote | +/// | Skip | skip the conflict; sync the rest | skip the conflict; sync the rest | +pub fn execute( + plan: SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, +) -> Result { + if !plan.conflicts.is_empty() && strategy.is_none() { + return Err(format!( + "execute() called with {} conflict(s) but no strategy", + plan.conflicts.len() + )); + } + + if matches!(strategy, Some(Strategy::Recent)) { + return Err( + "--strategy recent is not yet supported. use local, remote, or skip".to_string(), + ); + } + + let mut report = SyncReport::new(plan.direction); + // Skipped paths from the plan (hash already matched) carry through + // to the report so the user sees the full picture. + report.skipped.extend(plan.skipped.iter().cloned()); + + match plan.direction { + SyncDirection::Push => execute_push(&plan, local, remote, strategy, &mut report), + SyncDirection::Pull => execute_pull(&plan, local, remote, strategy, &mut report), + } + + // Final ordering for deterministic output. + report.uploaded.sort(); + report.downloaded.sort(); + report.skipped.sort(); + report.conflict_resolved.sort(); + report.failures.sort_by(|a, b| a.path.cmp(&b.path)); + + Ok(report) +} + +fn execute_push( + plan: &SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, + report: &mut SyncReport, +) { + // 1. Plain uploads (local-only files). + for meta in &plan.uploads { + match local.read(&meta.path) { + Ok(body) => match remote.create(&meta.path, &body) { + Ok(()) => report.uploaded.push(meta.path.clone()), + Err(stakpak_ak::Error::AlreadyExists(_)) => { + // 409 race: file appeared on the remote between plan + // and execute. Don't silently overwrite + report.failures.push(Failure { + path: meta.path.clone(), + error: "remote file appeared between plan and execute (race). \ + rerun `ak sync push` to re-evaluate" + .to_string(), + }); + } + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("upload failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("local read failed: {e}"), + }), + } + } + + // 2. Conflicts (apply strategy). + for conflict in &plan.conflicts { + match strategy { + Some(Strategy::Local) => { + // Overwrite remote with local. + match local.read(&conflict.path) { + Ok(body) => match remote.overwrite(&conflict.path, &body) { + Ok(()) => report.conflict_resolved.push(conflict.path.clone()), + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("conflict overwrite (remote) failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("local read for conflict failed: {e}"), + }), + } + } + Some(Strategy::Remote) | Some(Strategy::Skip) => { + report.skipped.push(conflict.path.clone()); + } + Some(Strategy::Recent) | None => { + // Already validated in execute(); unreachable here. + report.failures.push(Failure { + path: conflict.path.clone(), + error: "internal: conflict reached execute_push without resolvable strategy" + .to_string(), + }); + } + } + } +} + +fn execute_pull( + plan: &SyncPlan, + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + strategy: Option, + report: &mut SyncReport, +) { + // 1. Plain downloads (remote-only files). + for meta in &plan.downloads { + match remote.read(&meta.path) { + Ok(body) => match local.create(&meta.path, &body) { + Ok(()) => report.downloaded.push(meta.path.clone()), + Err(stakpak_ak::Error::AlreadyExists(_)) => { + // Local race: file appeared locally between plan and + // execute (e.g. concurrent `ak write`). Same handling + // as the push race. + report.failures.push(Failure { + path: meta.path.clone(), + error: "local file appeared between plan and execute (race). \ + rerun `ak sync pull` to re-evaluate" + .to_string(), + }); + } + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("local write failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: meta.path.clone(), + error: format!("remote read failed: {e}"), + }), + } + } + + // 2. Conflicts (apply strategy). + for conflict in &plan.conflicts { + match strategy { + Some(Strategy::Remote) => { + // Overwrite local with remote. + match remote.read(&conflict.path) { + Ok(body) => match local.overwrite(&conflict.path, &body) { + Ok(()) => report.conflict_resolved.push(conflict.path.clone()), + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("conflict overwrite (local) failed: {e}"), + }), + }, + Err(e) => report.failures.push(Failure { + path: conflict.path.clone(), + error: format!("remote read for conflict failed: {e}"), + }), + } + } + Some(Strategy::Local) | Some(Strategy::Skip) => { + // Pull: don't touch the local copy. + report.skipped.push(conflict.path.clone()); + } + Some(Strategy::Recent) | None => { + report.failures.push(Failure { + path: conflict.path.clone(), + error: "internal: conflict reached execute_pull without resolvable strategy" + .to_string(), + }); + } + } + } +} diff --git a/cli/src/commands/ak/sync.rs b/cli/src/commands/ak/sync/mod.rs similarity index 57% rename from cli/src/commands/ak/sync.rs rename to cli/src/commands/ak/sync/mod.rs index 4744a140e..cedf20db7 100644 --- a/cli/src/commands/ak/sync.rs +++ b/cli/src/commands/ak/sync/mod.rs @@ -1,9 +1,13 @@ //! `stakpak ak sync` — reconcile the local ak knowledge store with the //! remote Stakpak knowledge store. +mod display; +pub mod execute; +pub mod plan; + use clap::{Subcommand, ValueEnum}; -use stakpak_ak::{FileMeta, StorageBackend}; -use std::collections::HashMap; +use stakpak_ak::{LocalFsBackend, RemoteBackend}; +use stakpak_api::stakpak::StakpakApiConfig; use crate::config::AppConfig; @@ -125,367 +129,78 @@ impl SyncCommand { } } -pub fn run(cmd: SyncCommand, _config: AppConfig) -> Result<(), String> { - let _ = (cmd.direction(), cmd.dry_run(), cmd.strategy()); - todo!(); -} - -// ============================================================================ -// Plan -// ============================================================================ - -/// A single conflict: same path on both sides, different content. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Conflict { - pub path: String, - pub local_hash: String, - pub remote_hash: String, - pub local_size: u64, - pub remote_size: u64, -} - -/// The full reconciliation plan produced by [`plan`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SyncPlan { - pub direction: SyncDirection, - pub uploads: Vec, - pub downloads: Vec, - pub skipped: Vec, - pub conflicts: Vec, -} - -impl SyncPlan { - pub fn is_empty(&self) -> bool { - self.uploads.is_empty() && self.downloads.is_empty() && self.conflicts.is_empty() - } -} - -/// Build a [`SyncPlan`] by enumerating both sides and classifying each path. -/// -/// Both backends are walked from the store root (`""`) and indexed by -/// path. The classification rules follow the following rules -/// -/// | Local | Remote | Hashes | Push | Pull | -/// |-------|--------|--------|---------------|----------------| -/// | yes | no | — | upload | (silently skip)| -/// | no | yes | — | (silently skip)| download | -/// | yes | yes | match | skip | skip | -/// | yes | yes | differ | conflict | conflict | -/// -/// Output vectors are sorted by path for deterministic, diff-friendly output. -pub fn plan( - local: &dyn StorageBackend, - remote: &dyn StorageBackend, - direction: SyncDirection, -) -> Result { - let local_metas = local - .list_with_meta("") - .map_err(|e| format!("failed to enumerate local store: {e}"))?; - let remote_metas = remote - .list_with_meta("") - .map_err(|e| format!("failed to enumerate remote store: {e}"))?; - - let mut local_index: HashMap = local_metas - .into_iter() - .map(|meta| (meta.path.clone(), meta)) - .collect(); - let remote_index: HashMap = remote_metas - .into_iter() - .map(|meta| (meta.path.clone(), meta)) - .collect(); - - let mut uploads: Vec = Vec::new(); - let mut downloads: Vec = Vec::new(); - let mut skipped: Vec = Vec::new(); - let mut conflicts: Vec = Vec::new(); - - for (path, remote_meta) in &remote_index { - match local_index.remove(path) { - Some(local_meta) => { - if local_meta.content_hash == remote_meta.content_hash { - skipped.push(path.clone()); - } else { - conflicts.push(Conflict { - path: path.clone(), - local_hash: local_meta.content_hash, - remote_hash: remote_meta.content_hash.clone(), - local_size: local_meta.size_bytes, - remote_size: remote_meta.size_bytes, - }); - } - } - None => { - if matches!(direction, SyncDirection::Pull) { - downloads.push(remote_meta.clone()); - } - } - } - } - - for (_path, local_meta) in local_index.drain() { - if matches!(direction, SyncDirection::Push) { - uploads.push(local_meta); - } - } - - uploads.sort_by(|a, b| a.path.cmp(&b.path)); - downloads.sort_by(|a, b| a.path.cmp(&b.path)); - skipped.sort(); - conflicts.sort_by(|a, b| a.path.cmp(&b.path)); - - Ok(SyncPlan { - direction, - uploads, - downloads, - skipped, - conflicts, - }) -} - -// ============================================================================ -// Execute -// ============================================================================ - -/// Per-file failure encountered during [`execute`]. -/// -/// Sync continues past failures (per the design doc's "network blip during -/// execute" edge case) and surfaces the full list in [`SyncReport::failures`] -/// so the caller can decide on the exit code. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Failure { - pub path: String, - pub error: String, -} - -/// Outcome of [`execute`]. All vectors are sorted by path. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SyncReport { - pub direction: SyncDirection, - /// Paths successfully pushed (push only). - pub uploaded: Vec, - /// Paths successfully pulled (pull only). - pub downloaded: Vec, - /// Paths whose hashes already matched, plus any conflict skipped via - /// `--strategy` ([`Strategy::Skip`] or the no-op side of the chosen - /// strategy for that direction). - pub skipped: Vec, - /// Conflict paths where the chosen `--strategy` performed an - /// overwrite (e.g. `--strategy local` on a push pushed our local copy - /// over the remote's). - pub conflict_resolved: Vec, - /// Per-file errors. Sync did not abort; the file was simply skipped. - pub failures: Vec, -} - -impl SyncReport { - fn new(direction: SyncDirection) -> Self { - Self { - direction, - uploaded: Vec::new(), - downloaded: Vec::new(), - skipped: Vec::new(), - conflict_resolved: Vec::new(), - failures: Vec::new(), +pub fn run(cmd: SyncCommand, config: AppConfig) -> Result<(), String> { + let direction = cmd.direction(); + let dry_run = cmd.dry_run(); + let strategy = cmd.strategy(); + + let local = + LocalFsBackend::new().map_err(|e| format!("failed to open local knowledge store: {e}"))?; + + let api_key = config + .get_stakpak_api_key() + .filter(|k| !k.is_empty()) + .ok_or_else(|| { + "remote not configured. run `stakpak auth login` first to use ak sync".to_string() + })?; + let api_config = StakpakApiConfig::new(api_key).with_endpoint(config.api_endpoint.clone()); + let remote = RemoteBackend::new(&api_config) + .map_err(|e| format!("failed to connect to remote knowledge store: {e}"))?; + + let p = plan::plan(&local, &remote, direction)?; + + if dry_run { + display::print_plan(&p); + if !p.conflicts.is_empty() && strategy.is_none() { + println!(); + println!( + "{} conflict(s) detected. live run would fail-fast — \ + rerun without --dry-run with `--strategy local|remote|skip` to resolve", + p.conflicts.len() + ); } + return Ok(()); } - /// Whether any file failed to sync. The caller maps this to exit code 1. - pub fn has_failures(&self) -> bool { - !self.failures.is_empty() - } -} + if !p.conflicts.is_empty() && strategy.is_none() { + eprintln!("{} conflict(s) detected:", p.conflicts.len()); + eprintln!(); + display::print_conflicts_to_stderr(&p.conflicts); + eprintln!(); + eprintln!( + "rerun with `--strategy local|remote|skip` to resolve, \ + or `--dry-run` to inspect the plan without changing anything" + ); -/// Apply a [`SyncPlan`] against the two backends. -/// -/// Execution is sequential and per-file fault-tolerant: a failure on one -/// path is recorded in [`SyncReport::failures`] but does not stop sync of -/// the remaining files -/// -/// `strategy` is required if the plan contains conflicts. The CLI layer -/// (`run()`) is responsible for fail-fast handling — calling `execute()` -/// with conflicts but no strategy is a programming error and returns -/// `Err`. -/// -/// -/// | Strategy | Push (local → remote) | Pull (remote → local) | -/// |----------|------------------------------------|------------------------------------| -/// | Local | overwrite remote with local | keep local (skip download) | -/// | Remote | skip (don't push the local change) | overwrite local with remote | -/// | Skip | skip the conflict; sync the rest | skip the conflict; sync the rest | -pub fn execute( - plan: SyncPlan, - local: &dyn StorageBackend, - remote: &dyn StorageBackend, - strategy: Option, -) -> Result { - if !plan.conflicts.is_empty() && strategy.is_none() { return Err(format!( - "execute() called with {} conflict(s) but no strategy", - plan.conflicts.len() + "{} conflict(s) detected. rerun with `--strategy local|remote|skip` to resolve", + p.conflicts.len() )); } - if matches!(strategy, Some(Strategy::Recent)) { - return Err( - "--strategy recent is not yet supported. use local, remote, or skip".to_string(), - ); - } - - let mut report = SyncReport::new(plan.direction); - // Skipped paths from the plan (hash already matched) carry through - // to the report so the user sees the full picture. - report.skipped.extend(plan.skipped.iter().cloned()); - - match plan.direction { - SyncDirection::Push => execute_push(&plan, local, remote, strategy, &mut report), - SyncDirection::Pull => execute_pull(&plan, local, remote, strategy, &mut report), - } - - // Final ordering for deterministic output. - report.uploaded.sort(); - report.downloaded.sort(); - report.skipped.sort(); - report.conflict_resolved.sort(); - report.failures.sort_by(|a, b| a.path.cmp(&b.path)); - - Ok(report) -} - -fn execute_push( - plan: &SyncPlan, - local: &dyn StorageBackend, - remote: &dyn StorageBackend, - strategy: Option, - report: &mut SyncReport, -) { - // 1. Plain uploads (local-only files). - for meta in &plan.uploads { - match local.read(&meta.path) { - Ok(body) => match remote.create(&meta.path, &body) { - Ok(()) => report.uploaded.push(meta.path.clone()), - Err(stakpak_ak::Error::AlreadyExists(_)) => { - // 409 race: file appeared on the remote between plan - // and execute. Don't silently overwrite - report.failures.push(Failure { - path: meta.path.clone(), - error: "remote file appeared between plan and execute (race). \ - rerun `ak sync push` to re-evaluate" - .to_string(), - }); - } - Err(e) => report.failures.push(Failure { - path: meta.path.clone(), - error: format!("upload failed: {e}"), - }), - }, - Err(e) => report.failures.push(Failure { - path: meta.path.clone(), - error: format!("local read failed: {e}"), - }), + if p.is_empty() { + if p.skipped.is_empty() { + println!("nothing to sync"); + } else { + println!( + "nothing to sync — {} file(s) already in sync", + p.skipped.len() + ); } + return Ok(()); } - // 2. Conflicts (apply strategy). - for conflict in &plan.conflicts { - match strategy { - Some(Strategy::Local) => { - // Overwrite remote with local. - match local.read(&conflict.path) { - Ok(body) => match remote.overwrite(&conflict.path, &body) { - Ok(()) => report.conflict_resolved.push(conflict.path.clone()), - Err(e) => report.failures.push(Failure { - path: conflict.path.clone(), - error: format!("conflict overwrite (remote) failed: {e}"), - }), - }, - Err(e) => report.failures.push(Failure { - path: conflict.path.clone(), - error: format!("local read for conflict failed: {e}"), - }), - } - } - Some(Strategy::Remote) | Some(Strategy::Skip) => { - report.skipped.push(conflict.path.clone()); - } - Some(Strategy::Recent) | None => { - // Already validated in execute(); unreachable here. - report.failures.push(Failure { - path: conflict.path.clone(), - error: "internal: conflict reached execute_push without resolvable strategy" - .to_string(), - }); - } - } - } -} + let report = execute::execute(p, &local, &remote, strategy)?; + display::print_report(&report); -fn execute_pull( - plan: &SyncPlan, - local: &dyn StorageBackend, - remote: &dyn StorageBackend, - strategy: Option, - report: &mut SyncReport, -) { - // 1. Plain downloads (remote-only files). - for meta in &plan.downloads { - match remote.read(&meta.path) { - Ok(body) => match local.create(&meta.path, &body) { - Ok(()) => report.downloaded.push(meta.path.clone()), - Err(stakpak_ak::Error::AlreadyExists(_)) => { - // Local race: file appeared locally between plan and - // execute (e.g. concurrent `ak write`). Same handling - // as the push race. - report.failures.push(Failure { - path: meta.path.clone(), - error: "local file appeared between plan and execute (race). \ - rerun `ak sync pull` to re-evaluate" - .to_string(), - }); - } - Err(e) => report.failures.push(Failure { - path: meta.path.clone(), - error: format!("local write failed: {e}"), - }), - }, - Err(e) => report.failures.push(Failure { - path: meta.path.clone(), - error: format!("remote read failed: {e}"), - }), - } + if report.has_failures() { + return Err(format!( + "{} file(s) failed to sync — see report above", + report.failures.len() + )); } - // 2. Conflicts (apply strategy). - for conflict in &plan.conflicts { - match strategy { - Some(Strategy::Remote) => { - // Overwrite local with remote. - match remote.read(&conflict.path) { - Ok(body) => match local.overwrite(&conflict.path, &body) { - Ok(()) => report.conflict_resolved.push(conflict.path.clone()), - Err(e) => report.failures.push(Failure { - path: conflict.path.clone(), - error: format!("conflict overwrite (local) failed: {e}"), - }), - }, - Err(e) => report.failures.push(Failure { - path: conflict.path.clone(), - error: format!("remote read for conflict failed: {e}"), - }), - } - } - Some(Strategy::Local) | Some(Strategy::Skip) => { - // Pull: don't touch the local copy. - report.skipped.push(conflict.path.clone()); - } - Some(Strategy::Recent) | None => { - report.failures.push(Failure { - path: conflict.path.clone(), - error: "internal: conflict reached execute_pull without resolvable strategy" - .to_string(), - }); - } - } - } + Ok(()) } #[cfg(test)] @@ -493,6 +208,7 @@ mod tests { use super::*; use clap::CommandFactory; use clap::Parser; + use execute::SyncReport; use stakpak_ak::{LocalFsBackend, StorageBackend}; #[derive(Parser, Debug)] @@ -551,7 +267,7 @@ mod tests { #[test] fn plan_empty_stores() { let (_temp, local, remote) = pair(); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert!(p.uploads.is_empty()); assert!(p.downloads.is_empty()); @@ -568,7 +284,7 @@ mod tests { .create("services/b.md", b"beta") .expect("create local"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!(p.uploads.len(), 2); assert_eq!(p.uploads[0].path, "notes/a.md"); @@ -588,7 +304,7 @@ mod tests { .create("services/b.md", b"beta") .expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Pull).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Pull).expect("plan"); assert_eq!(p.downloads.len(), 2); assert_eq!(p.downloads[0].path, "notes/a.md"); @@ -607,7 +323,7 @@ mod tests { .create("only-remote.md", b"x") .expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert!(p.uploads.is_empty()); assert!(p.downloads.is_empty()); @@ -620,7 +336,7 @@ mod tests { let (_temp, local, remote) = pair(); local.create("only-local.md", b"x").expect("create local"); - let p = plan(&local, &remote, SyncDirection::Pull).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Pull).expect("plan"); assert!(p.uploads.is_empty()); assert!(p.downloads.is_empty()); @@ -634,7 +350,7 @@ mod tests { local.create("shared.md", b"same").expect("create local"); remote.create("shared.md", b"same").expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!(p.skipped, vec!["shared.md".to_string()]); assert!(p.uploads.is_empty()); @@ -652,7 +368,7 @@ mod tests { .create("shared.md", b"remote-version") .expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!(p.conflicts.len(), 1); let c = &p.conflicts[0]; @@ -692,7 +408,7 @@ mod tests { .create("conflict.md", b"remote-side") .expect("create conflict remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!( p.uploads @@ -720,7 +436,7 @@ mod tests { remote.create(path, b"x").expect("create remote"); } - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); let upload_paths: Vec<&str> = p.uploads.iter().map(|m| m.path.as_str()).collect(); assert_eq!(upload_paths, vec!["alpha.md", "mango.md", "zebra.md"]); @@ -739,8 +455,8 @@ mod tests { direction: SyncDirection, strategy: Option, ) -> SyncReport { - let p = plan(local, remote, direction).expect("plan"); - execute(p, local, remote, strategy).expect("execute") + let p = plan::plan(local, remote, direction).expect("plan"); + execute::execute(p, local, remote, strategy).expect("execute") } #[test] @@ -882,10 +598,10 @@ mod tests { local.create("c.md", b"L").expect("create local"); remote.create("c.md", b"R").expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!(p.conflicts.len(), 1); - let err = execute(p, &local, &remote, None).expect_err("must error"); + let err = execute::execute(p, &local, &remote, None).expect_err("must error"); assert!(err.contains("conflict"), "got: {err}"); } @@ -898,9 +614,9 @@ mod tests { local.create("c.md", b"L").expect("create local"); remote.create("c.md", b"R").expect("create remote"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); - let err = - execute(p, &local, &remote, Some(Strategy::Recent)).expect_err("recent unsupported"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); + let err = execute::execute(p, &local, &remote, Some(Strategy::Recent)) + .expect_err("recent unsupported"); assert!(err.contains("recent"), "got: {err}"); } @@ -914,7 +630,7 @@ mod tests { local.create("good.md", b"g").expect("create good"); local.create("racy.md", b"r-local").expect("create racy"); - let p = plan(&local, &remote, SyncDirection::Push).expect("plan"); + let p = plan::plan(&local, &remote, SyncDirection::Push).expect("plan"); assert_eq!(p.uploads.len(), 2); // Inject the race: a different process created the file remotely @@ -923,7 +639,7 @@ mod tests { .create("racy.md", b"r-remote") .expect("simulate race on remote"); - let report = execute(p, &local, &remote, None).expect("execute"); + let report = execute::execute(p, &local, &remote, None).expect("execute"); // The good file made it through. assert_eq!(report.uploaded, vec!["good.md"]); diff --git a/cli/src/commands/ak/sync/plan.rs b/cli/src/commands/ak/sync/plan.rs new file mode 100644 index 000000000..2e2e70bd9 --- /dev/null +++ b/cli/src/commands/ak/sync/plan.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; + +use stakpak_ak::{FileMeta, StorageBackend}; + +use super::SyncDirection; + +/// A single conflict: same path on both sides, different content. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Conflict { + pub path: String, + pub local_hash: String, + pub remote_hash: String, + pub local_size: u64, + pub remote_size: u64, +} + +/// The full reconciliation plan produced by [`plan`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncPlan { + pub direction: SyncDirection, + pub uploads: Vec, + pub downloads: Vec, + pub skipped: Vec, + pub conflicts: Vec, +} + +impl SyncPlan { + pub fn is_empty(&self) -> bool { + self.uploads.is_empty() && self.downloads.is_empty() && self.conflicts.is_empty() + } +} + +/// Build a [`SyncPlan`] by enumerating both sides and classifying each path. +/// +/// Both backends are walked from the store root (`""`) and indexed by +/// path. The classification rules follow the following rules +/// +/// | Local | Remote | Hashes | Push | Pull | +/// |-------|--------|--------|---------------|----------------| +/// | yes | no | — | upload | (silently skip)| +/// | no | yes | — | (silently skip)| download | +/// | yes | yes | match | skip | skip | +/// | yes | yes | differ | conflict | conflict | +/// +/// Output vectors are sorted by path for deterministic, diff-friendly output. +pub fn plan( + local: &dyn StorageBackend, + remote: &dyn StorageBackend, + direction: SyncDirection, +) -> Result { + let local_metas = local + .list_with_meta("") + .map_err(|e| format!("failed to enumerate local store: {e}"))?; + let remote_metas = remote + .list_with_meta("") + .map_err(|e| format!("failed to enumerate remote store: {e}"))?; + + let mut local_index: HashMap = local_metas + .into_iter() + .map(|meta| (meta.path.clone(), meta)) + .collect(); + let remote_index: HashMap = remote_metas + .into_iter() + .map(|meta| (meta.path.clone(), meta)) + .collect(); + + let mut uploads: Vec = Vec::new(); + let mut downloads: Vec = Vec::new(); + let mut skipped: Vec = Vec::new(); + let mut conflicts: Vec = Vec::new(); + + for (path, remote_meta) in &remote_index { + match local_index.remove(path) { + Some(local_meta) => { + if local_meta.content_hash == remote_meta.content_hash { + skipped.push(path.clone()); + } else { + conflicts.push(Conflict { + path: path.clone(), + local_hash: local_meta.content_hash, + remote_hash: remote_meta.content_hash.clone(), + local_size: local_meta.size_bytes, + remote_size: remote_meta.size_bytes, + }); + } + } + None => { + if matches!(direction, SyncDirection::Pull) { + downloads.push(remote_meta.clone()); + } + } + } + } + + for (_path, local_meta) in local_index.drain() { + if matches!(direction, SyncDirection::Push) { + uploads.push(local_meta); + } + } + + uploads.sort_by(|a, b| a.path.cmp(&b.path)); + downloads.sort_by(|a, b| a.path.cmp(&b.path)); + skipped.sort(); + conflicts.sort_by(|a, b| a.path.cmp(&b.path)); + + Ok(SyncPlan { + direction, + uploads, + downloads, + skipped, + conflicts, + }) +} diff --git a/libs/shared/src/format.rs b/libs/shared/src/format.rs new file mode 100644 index 000000000..5ac206b16 --- /dev/null +++ b/libs/shared/src/format.rs @@ -0,0 +1,58 @@ +//! Small formatting helpers shared across crates. +//! +//! Display-oriented utilities for human-readable terminal output. + +/// Render a byte count as a short, fixed-precision human string. +/// +/// Uses binary units (KB = 1024 B) to match `du -h` and similar tools. +/// One decimal place above the byte threshold for compact output. +pub fn format_size(bytes: u64) -> String { + const KB: u64 = 1024; + const MB: u64 = KB * 1024; + const GB: u64 = MB * 1024; + if bytes < KB { + format!("{bytes} B") + } else if bytes < MB { + format!("{:.1} KB", bytes as f64 / KB as f64) + } else if bytes < GB { + format!("{:.1} MB", bytes as f64 / MB as f64) + } else { + format!("{:.1} GB", bytes as f64 / GB as f64) + } +} + +/// Git-style short hash — first 8 chars of `hash`, or the full string if +/// it's shorter than 8 chars (defensive for non-hex inputs). +/// +/// Safe on UTF-8 because SHA-256 hex is ASCII. The `min` guard handles +/// callers that pass shorter strings +pub fn short_hash(hash: &str) -> &str { + let end = hash.len().min(8); + &hash[..end] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn format_size_renders_units() { + assert_eq!(format_size(0), "0 B"); + assert_eq!(format_size(512), "512 B"); + assert_eq!(format_size(1023), "1023 B"); + assert_eq!(format_size(1024), "1.0 KB"); + assert_eq!(format_size(1024 * 1024), "1.0 MB"); + assert_eq!(format_size(1024 * 1024 * 1024), "1.0 GB"); + // 2.5 MB exact + assert_eq!(format_size(1024 * 1024 * 5 / 2), "2.5 MB"); + } + + #[test] + fn short_hash_truncates_and_handles_short_input() { + let full = "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"; + assert_eq!(short_hash(full), "ba7816bf"); + // Shorter-than-8 input falls back to the full string. + assert_eq!(short_hash("abc"), "abc"); + assert_eq!(short_hash(""), ""); + } +} diff --git a/libs/shared/src/lib.rs b/libs/shared/src/lib.rs index 813e9c2f2..30d3852cb 100644 --- a/libs/shared/src/lib.rs +++ b/libs/shared/src/lib.rs @@ -3,6 +3,7 @@ pub mod cert_utils; pub mod container; pub mod file_backup_manager; pub mod file_watcher; +pub mod format; pub mod hash; pub mod helper; pub mod hooks; From 266c4f078b699c38cb24e05678d847585a8d8f83 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 12:57:23 +0300 Subject: [PATCH 7/9] fix(format): Safely handle short hash input to prevent panics --- libs/shared/src/format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/shared/src/format.rs b/libs/shared/src/format.rs index 5ac206b16..8124c31e8 100644 --- a/libs/shared/src/format.rs +++ b/libs/shared/src/format.rs @@ -28,7 +28,7 @@ pub fn format_size(bytes: u64) -> String { /// callers that pass shorter strings pub fn short_hash(hash: &str) -> &str { let end = hash.len().min(8); - &hash[..end] + hash.get(..end).unwrap_or(hash) } #[cfg(test)] From 6d3c11197d1d3168ab04cab28fa3e2174c7180ba Mon Sep 17 00:00:00 2001 From: Shehab Atia <89648315+shehab299@users.noreply.github.com> Date: Mon, 8 Jun 2026 13:19:30 +0300 Subject: [PATCH 8/9] fix(ak sync) guard against windows paths in sync Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- libs/ak/src/store.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libs/ak/src/store.rs b/libs/ak/src/store.rs index 7f0365975..ae9c14618 100644 --- a/libs/ak/src/store.rs +++ b/libs/ak/src/store.rs @@ -421,12 +421,15 @@ impl StorageBackend for LocalFsBackend { let mut results = Vec::with_capacity(paths.len()); for path in paths { - let absolute = self.resolve_path(&path)?; + // Normalize to forward slashes so local paths compare correctly with + // remote API paths (which are always `/`-separated), including on Windows. + let normalized_path = path.replace('\\', "/"); + let absolute = self.resolve_path(&normalized_path)?; let bytes = fs::read(&absolute)?; let size_bytes = bytes.len() as u64; let content_hash = sha256_hex(&bytes); results.push(FileMeta { - path, + path: normalized_path, content_hash, size_bytes, }); @@ -435,6 +438,9 @@ impl StorageBackend for LocalFsBackend { Ok(results) } + Ok(results) + } + fn exists(&self, path: &str) -> Result { let target = self.resolve_path(path)?; self.ensure_no_symlinks_below_root(&target)?; From 90bd4dd87dbee76adbf6b5ea891fc9c5f386cee4 Mon Sep 17 00:00:00 2001 From: shehab299 Date: Mon, 8 Jun 2026 13:18:10 +0300 Subject: [PATCH 9/9] doc(ak sync) enhance documentation for sync plan --- cli/src/commands/ak/sync/plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/commands/ak/sync/plan.rs b/cli/src/commands/ak/sync/plan.rs index 2e2e70bd9..509ce46a2 100644 --- a/cli/src/commands/ak/sync/plan.rs +++ b/cli/src/commands/ak/sync/plan.rs @@ -33,7 +33,7 @@ impl SyncPlan { /// Build a [`SyncPlan`] by enumerating both sides and classifying each path. /// /// Both backends are walked from the store root (`""`) and indexed by -/// path. The classification rules follow the following rules +/// path. The classification rules follow these rules /// /// | Local | Remote | Hashes | Push | Pull | /// |-------|--------|--------|---------------|----------------|