diff --git a/Cargo.lock b/Cargo.lock index 77adb87..97a1607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -243,6 +249,12 @@ dependencies = [ "syn", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.14" @@ -261,7 +273,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "fetchfs" -version = "0.2.0" +version = "0.2.1" dependencies = [ "chrono", "clap", @@ -270,6 +282,7 @@ dependencies = [ "hex", "httpdate", "libc", + "lru", "reqwest", "serde", "serde_json", @@ -289,6 +302,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -422,6 +441,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "heck" version = "0.5.0" @@ -744,6 +774,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" +dependencies = [ + "hashbrown", +] + [[package]] name = "lru-slab" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 3ece82c..114941c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fetchfs" -version = "0.2.0" +version = "0.2.1" edition = "2024" [dependencies] @@ -10,6 +10,7 @@ fs2 = "0.4" hex = "0.4" httpdate = "1.0" libc = "0.2" +lru = "0.16.3" reqwest = { version = "0.12", default-features = false, features = ["blocking", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/cache.rs b/src/cache.rs index 9fecd6f..01c3d78 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,6 +1,7 @@ use std::fs::{self, File, OpenOptions}; use std::io; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use std::{env, io::ErrorKind}; @@ -17,6 +18,7 @@ use crate::http::UrlDownloader; #[derive(Debug, Clone)] pub struct Cache { root: PathBuf, + lru_tracker: Arc, } #[derive(Debug, Clone)] @@ -54,6 +56,79 @@ pub struct BlockEntry { pub meta_path: PathBuf, } +pub struct LruTracker { + inner: Mutex, + max_bytes: u64, +} + +struct LruInner { + order: lru::LruCache, + current_bytes: u64, +} + +impl std::fmt::Debug for LruTracker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LruTracker") + .field("max_bytes", &self.max_bytes) + .finish_non_exhaustive() + } +} + +impl LruTracker { + pub fn new(max_bytes: u64) -> Self { + LruTracker { + inner: Mutex::new(LruInner { + order: lru::LruCache::unbounded(), + current_bytes: 0, + }), + max_bytes, + } + } + + /// Register a cached file. Evicts least-recently-used entries from disk + /// until total tracked bytes is within the configured limit. + pub fn track(&self, path: &Path, size: u64) { + let to_evict = { + let mut inner = self.inner.lock().expect("lru lock"); + if let Some(old_size) = inner.order.get_mut(path) { + let prev = *old_size; + *old_size = size; + inner.current_bytes = inner.current_bytes - prev + size; + return; + } + inner.order.push(path.to_path_buf(), size); + inner.current_bytes += size; + let mut evicted = Vec::new(); + while inner.current_bytes > self.max_bytes { + if let Some((p, s)) = inner.order.pop_lru() { + inner.current_bytes -= s; + evicted.push((p, s)); + } else { + break; + } + } + evicted + }; + for (evicted_path, evicted_size) in to_evict { + if let Err(err) = fs::remove_file(&evicted_path) { + debug!("lru evict remove failed for {:?}: {err}", evicted_path); + } + info!("lru evicted {:?} ({evicted_size} bytes)", evicted_path); + } + } + + /// Promote a cached file to most-recently-used. + pub fn touch(&self, path: &Path) { + let mut inner = self.inner.lock().expect("lru lock"); + let _ = inner.order.get(path); + } + + #[cfg(test)] + fn current_bytes(&self) -> u64 { + self.inner.lock().expect("lru lock").current_bytes + } +} + impl BlockCache { pub fn new(root: impl Into, block_size: u64) -> Self { let block_size = block_size.max(4096); @@ -95,8 +170,19 @@ impl BlockCache { } impl Cache { - pub fn new(root: impl Into) -> Self { - Cache { root: root.into() } + pub fn new(root: impl Into, lru_tracker: Arc) -> Self { + Cache { + root: root.into(), + lru_tracker, + } + } + + pub fn track(&self, path: &Path, size: u64) { + self.lru_tracker.track(path, size); + } + + pub fn touch(&self, path: &Path) { + self.lru_tracker.touch(path); } pub fn entry_for(&self, url: &str) -> io::Result { @@ -193,6 +279,7 @@ impl Cache { let _lock = Self::lock(entry)?; if Self::is_fresh(entry, expected)? { debug!("cache hit for {:?}", entry.data_path); + self.touch(&entry.data_path); return Ok(entry.data_path.clone()); } @@ -216,6 +303,7 @@ impl Cache { meta.size = Some(metadata.len()); } Self::write_meta(entry, &meta)?; + self.track(&entry.data_path, meta.size.unwrap_or(0)); Ok(entry.data_path.clone()) } @@ -223,16 +311,16 @@ impl Cache { &self, entry: &CacheEntry, expected: CacheMeta, - downloader: std::sync::Arc, - download_gate: std::sync::Arc, - download_pool: std::sync::Arc, + downloader: Arc, + download_gate: Arc, + download_pool: Arc, ) -> io::Result { if Self::is_fresh(entry, &expected)? { debug!("cache hit for {:?}", entry.data_path); + self.touch(&entry.data_path); return Ok(entry.data_path.clone()); } - // Create a placeholder so readers don't hit ENOENT while the background download starts. let _ = OpenOptions::new() .create(true) .append(true) @@ -248,6 +336,7 @@ impl Cache { let data_path = entry.data_path.clone(); let meta_path = entry.meta_path.clone(); + let lru_tracker = Arc::clone(&self.lru_tracker); download_pool .enqueue(move || { let _permit = download_gate.acquire(); @@ -259,11 +348,15 @@ impl Cache { return; } let mut meta = expected; - if meta.size.is_none() - && let Ok(metadata) = fs::metadata(&data_path) - { - meta.size = Some(metadata.len()); - } + let size = match meta.size { + Some(s) => s, + None => { + let s = fs::metadata(&data_path).map(|m| m.len()).unwrap_or(0); + meta.size = Some(s); + s + } + }; + lru_tracker.track(&data_path, size); if let Ok(raw) = serde_json::to_string(&meta) { let _ = fs::write(&meta_path, raw); } @@ -291,6 +384,7 @@ impl Cache { meta.size = Some(data.len() as u64); } Self::write_meta(entry, &meta)?; + self.track(&entry.data_path, data.len() as u64); Ok(entry.data_path.clone()) } @@ -457,7 +551,7 @@ mod tests { #[test] fn cache_entry_paths_are_stable() { let tmp = tempfile::tempdir().expect("tempdir"); - let cache = Cache::new(tmp.path()); + let cache = Cache::new(tmp.path(), Arc::new(LruTracker::new(u64::MAX))); let entry = cache .entry_for("https://example.com/file.txt") .expect("entry"); @@ -480,7 +574,7 @@ mod tests { #[test] fn ensure_cached_downloads_and_reuses() { let tmp = tempfile::tempdir().expect("tempdir"); - let cache = Cache::new(tmp.path()); + let cache = Cache::new(tmp.path(), Arc::new(LruTracker::new(u64::MAX))); let entry = cache .entry_for("https://example.com/file.txt") .expect("entry"); @@ -507,4 +601,66 @@ mod tests { assert_eq!(*calls.lock().expect("lock"), 1); assert_eq!(fs::read(&entry.data_path).expect("read"), b"data"); } + + #[test] + fn lru_tracker_evicts_oldest_when_over_limit() { + let tmp = tempfile::tempdir().expect("tempdir"); + let tracker = LruTracker::new(250); + + let mut paths = Vec::new(); + for i in 0..3 { + let path = tmp.path().join(format!("file_{i}")); + fs::write(&path, vec![0u8; 100]).expect("write"); + paths.push(path); + } + + for path in &paths { + tracker.track(path, 100); + } + + assert!(!paths[0].exists(), "oldest file should be evicted"); + assert!(paths[1].exists()); + assert!(paths[2].exists()); + assert_eq!(tracker.current_bytes(), 200); + } + + #[test] + fn lru_tracker_touch_prevents_eviction() { + let tmp = tempfile::tempdir().expect("tempdir"); + let tracker = LruTracker::new(250); + + let path_a = tmp.path().join("a"); + let path_b = tmp.path().join("b"); + let path_c = tmp.path().join("c"); + for p in [&path_a, &path_b, &path_c] { + fs::write(p, vec![0u8; 100]).expect("write"); + } + + tracker.track(&path_a, 100); + tracker.track(&path_b, 100); + tracker.touch(&path_a); + tracker.track(&path_c, 100); + + assert!(path_a.exists(), "touched file should survive"); + assert!(!path_b.exists(), "untouched LRU file should be evicted"); + assert!(path_c.exists()); + assert_eq!(tracker.current_bytes(), 200); + } + + #[test] + fn lru_tracker_track_updates_existing_entry() { + let tmp = tempfile::tempdir().expect("tempdir"); + let tracker = LruTracker::new(500); + + let path = tmp.path().join("file"); + fs::write(&path, vec![0u8; 100]).expect("write"); + + tracker.track(&path, 100); + assert_eq!(tracker.current_bytes(), 100); + + tracker.track(&path, 200); + assert_eq!(tracker.current_bytes(), 200); + + assert!(path.exists()); + } } diff --git a/src/filesystem.rs b/src/filesystem.rs index bbca4a0..0670559 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -150,6 +150,7 @@ impl FetchFs { } let cache_entry = self.cache_entry_for(entry)?; if cache_entry.data_path.exists() { + self.cache.touch(&cache_entry.data_path); return read_from_path(&cache_entry.data_path, offset, size); } @@ -241,6 +242,7 @@ impl FetchFs { for block in start_block..=end_block { let block_path = block_cache.block_path(&block_entry, block); if block_path.exists() { + self.cache.touch(&block_path); continue; } { @@ -256,6 +258,7 @@ impl FetchFs { .open(&lock_path)?; lock_file.lock_exclusive()?; if block_path.exists() { + self.cache.touch(&block_path); drop(lock_file); continue; } @@ -270,9 +273,11 @@ impl FetchFs { if meta.mtime.is_none() { meta.mtime = result.meta.mtime; } + let block_len = result.data.len() as u64; let tmp_path = block_path.with_extension("tmp"); std::fs::write(&tmp_path, result.data)?; std::fs::rename(&tmp_path, &block_path)?; + self.cache.track(&block_path, block_len); drop(lock_file); } RangeStatus::Full => { @@ -361,7 +366,10 @@ mod tests { }; let tree = manifest.build_tree().expect("tree"); let cache_dir = tempfile::tempdir().expect("tempdir"); - let cache = Cache::new(cache_dir.path()); + let cache = Cache::new( + cache_dir.path(), + Arc::new(crate::cache::LruTracker::new(u64::MAX)), + ); let block_cache = Some(BlockCache::new(cache_dir.path().join("blocks"), 4096)); let downloader = HttpClient::new(0, 0, 100, 100).expect("client"); let fs = FetchFs::new( @@ -433,7 +441,10 @@ mod tests { }; let tree = manifest.build_tree().expect("tree"); let cache_dir = tempfile::tempdir().expect("tempdir"); - let cache = Cache::new(cache_dir.path()); + let cache = Cache::new( + cache_dir.path(), + Arc::new(crate::cache::LruTracker::new(u64::MAX)), + ); let block_cache = Some(BlockCache::new(cache_dir.path().join("blocks"), 4096)); let downloader = HttpClient::new(0, 0, 1000, 2000).expect("client"); let fs = FetchFs::new( @@ -479,4 +490,204 @@ mod tests { assert_eq!(calls.load(Ordering::SeqCst), 1); } + + fn start_multi_file_server( + file_sizes: &[usize], + ) -> ( + std::net::SocketAddr, + Arc, + mpsc::Sender<()>, + thread::JoinHandle<()>, + ) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().expect("addr"); + listener.set_nonblocking(true).expect("nonblocking"); + + let total_calls = Arc::new(AtomicUsize::new(0)); + let (stop_tx, stop_rx) = mpsc::channel(); + let total_calls_server = Arc::clone(&total_calls); + + let bodies: Vec> = file_sizes + .iter() + .enumerate() + .map(|(i, &sz)| vec![b'a' + i as u8; sz]) + .collect(); + let n_files = bodies.len(); + + let handle = thread::spawn(move || { + loop { + if stop_rx.try_recv().is_ok() { + break; + } + let (mut stream, _) = match listener.accept() { + Ok(conn) => conn, + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(5)); + continue; + } + Err(_) => break, + }; + let mut buffer = [0u8; 4096]; + let n = stream.read(&mut buffer).unwrap_or(0); + let request = String::from_utf8_lossy(&buffer[..n]); + total_calls_server.fetch_add(1, Ordering::SeqCst); + + let file_idx = request + .lines() + .next() + .and_then(|line| { + let path = line.split_whitespace().nth(1)?; + let name = path.trim_start_matches('/'); + name.strip_prefix("file_") + .and_then(|s| s.parse::().ok()) + }) + .unwrap_or(0) + .min(n_files.saturating_sub(1)); + + let body = &bodies[file_idx]; + let header = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()); + let _ = stream.write_all(header.as_bytes()); + let _ = stream.write_all(body); + } + }); + + (addr, total_calls, stop_tx, handle) + } + + #[test] + fn lru_evicts_least_recently_used_file() { + let (addr, total_calls, stop_tx, server_handle) = start_multi_file_server(&[100, 100, 100]); + + let mtime = Some(chrono::Utc::now()); + let manifest = Manifest { + version: 1, + entries: vec![ + ManifestEntry { + path: "file_0".to_string(), + url: format!("http://{}/file_0", addr), + size: Some(100), + mtime, + }, + ManifestEntry { + path: "file_1".to_string(), + url: format!("http://{}/file_1", addr), + size: Some(100), + mtime, + }, + ManifestEntry { + path: "file_2".to_string(), + url: format!("http://{}/file_2", addr), + size: Some(100), + mtime, + }, + ], + }; + let tree = manifest.build_tree().expect("tree"); + let cache_dir = tempfile::tempdir().expect("tempdir"); + let tracker = Arc::new(crate::cache::LruTracker::new(250)); + let cache = Cache::new(cache_dir.path(), Arc::clone(&tracker)); + let downloader = HttpClient::new(0, 0, 1000, 2000).expect("client"); + let fs = FetchFs::new( + manifest, + tree, + cache, + None, + Arc::new(downloader), + Arc::new(DownloadGate::new(4)), + Arc::new(DownloadPool::new(4)), + false, + ); + + for i in 0..3 { + fs.ensure_cached(&fs.manifest.entries[i]).expect("cache"); + } + + let entry0 = fs.cache_entry_for(&fs.manifest.entries[0]).expect("entry"); + let entry1 = fs.cache_entry_for(&fs.manifest.entries[1]).expect("entry"); + let entry2 = fs.cache_entry_for(&fs.manifest.entries[2]).expect("entry"); + assert!( + !entry0.data_path.exists(), + "file_0 should be evicted (oldest)" + ); + assert!(entry1.data_path.exists(), "file_1 should remain"); + assert!(entry2.data_path.exists(), "file_2 should remain"); + assert_eq!(total_calls.load(Ordering::SeqCst), 3); + + fs.ensure_cached(&fs.manifest.entries[0]).expect("re-cache"); + assert_eq!( + total_calls.load(Ordering::SeqCst), + 4, + "evicted file should be re-downloaded" + ); + + let _ = stop_tx.send(()); + let _ = server_handle.join(); + } + + #[test] + fn lru_touch_on_read_keeps_file_alive() { + let (addr, _total_calls, stop_tx, server_handle) = + start_multi_file_server(&[100, 100, 100]); + + let mtime = Some(chrono::Utc::now()); + let manifest = Manifest { + version: 1, + entries: vec![ + ManifestEntry { + path: "file_0".to_string(), + url: format!("http://{}/file_0", addr), + size: Some(100), + mtime, + }, + ManifestEntry { + path: "file_1".to_string(), + url: format!("http://{}/file_1", addr), + size: Some(100), + mtime, + }, + ManifestEntry { + path: "file_2".to_string(), + url: format!("http://{}/file_2", addr), + size: Some(100), + mtime, + }, + ], + }; + let tree = manifest.build_tree().expect("tree"); + let cache_dir = tempfile::tempdir().expect("tempdir"); + let tracker = Arc::new(crate::cache::LruTracker::new(250)); + let cache = Cache::new(cache_dir.path(), Arc::clone(&tracker)); + let downloader = HttpClient::new(0, 0, 1000, 2000).expect("client"); + let fs = FetchFs::new( + manifest, + tree, + cache, + None, + Arc::new(downloader), + Arc::new(DownloadGate::new(4)), + Arc::new(DownloadPool::new(4)), + false, + ); + + fs.ensure_cached(&fs.manifest.entries[0]).expect("cache A"); + fs.ensure_cached(&fs.manifest.entries[1]).expect("cache B"); + + fs.read_range(&fs.manifest.entries[0], 0, 10) + .expect("touch A via read"); + + fs.ensure_cached(&fs.manifest.entries[2]).expect("cache C"); + + let entry0 = fs.cache_entry_for(&fs.manifest.entries[0]).expect("entry"); + let entry1 = fs.cache_entry_for(&fs.manifest.entries[1]).expect("entry"); + let entry2 = fs.cache_entry_for(&fs.manifest.entries[2]).expect("entry"); + assert!( + entry0.data_path.exists(), + "file_0 should survive (was touched)" + ); + assert!(!entry1.data_path.exists(), "file_1 should be evicted (LRU)"); + assert!(entry2.data_path.exists(), "file_2 should remain"); + + let _ = stop_tx.send(()); + let _ = server_handle.join(); + } } diff --git a/src/fuse_fs.rs b/src/fuse_fs.rs index 9187e2b..bb82dff 100644 --- a/src/fuse_fs.rs +++ b/src/fuse_fs.rs @@ -273,7 +273,12 @@ impl Filesystem for FuseFS { reply: ReplyXattr, ) { if let Some(tracer) = &self.tracer { - tracer.getxattr(ino, name.to_string_lossy().as_ref(), size, self.path_for_inode(ino)); + tracer.getxattr( + ino, + name.to_string_lossy().as_ref(), + size, + self.path_for_inode(ino), + ); } if self.node_for_inode(ino).is_none() { reply.error(ENOENT); @@ -433,14 +438,7 @@ impl Filesystem for FuseFS { reply.ok(); } - fn flush( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - lock_owner: u64, - reply: ReplyEmpty, - ) { + fn flush(&mut self, _req: &Request<'_>, ino: u64, fh: u64, lock_owner: u64, reply: ReplyEmpty) { if let Some(tracer) = &self.tracer { tracer.flush(ino, fh, lock_owner, self.path_for_inode(ino)); } @@ -460,7 +458,14 @@ impl Filesystem for FuseFS { reply: ReplyWrite, ) { if let Some(tracer) = &self.tracer { - tracer.write(ino, fh, offset, data.len() as u32, flags, self.path_for_inode(ino)); + tracer.write( + ino, + fh, + offset, + data.len() as u32, + flags, + self.path_for_inode(ino), + ); } reply.error(EROFS); } @@ -496,7 +501,10 @@ mod tests { }; let tree = manifest.build_tree().expect("tree"); let cache_dir = tempfile::tempdir().expect("tempdir"); - let cache = Cache::new(cache_dir.path()); + let cache = Cache::new( + cache_dir.path(), + std::sync::Arc::new(crate::cache::LruTracker::new(u64::MAX)), + ); let http = HttpClient::new(0, 0, 100, 100).expect("client"); let fs = FetchFs::new( manifest, diff --git a/src/main.rs b/src/main.rs index 883f114..b73987d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod filesystem; mod fuse_fs; mod http; mod manifest; +mod mount_cache; mod syscall_trace; mod tree; @@ -19,7 +20,7 @@ use std::process::ExitCode; use tracing::{error, info, warn}; use tracing_subscriber::EnvFilter; -use crate::cache::{Cache, default_cache_dir}; +use crate::cache::{Cache, LruTracker, default_cache_dir}; use crate::download_gate::DownloadGate; use crate::download_pool::DownloadPool; use crate::filesystem::FetchFs; @@ -60,8 +61,9 @@ enum Commands { connect_timeout_ms: u64, #[arg(long, default_value_t = 30000)] request_timeout_ms: u64, - #[arg(long)] - cache_max_bytes: Option, + /// Maximum cache size in bytes. + #[arg(long, default_value_t = 4_294_967_296)] + cache_max_bytes: u64, #[arg(long)] cache_max_age_days: Option, #[arg(long)] @@ -136,15 +138,28 @@ fn main() -> ExitCode { } }; let file_count = manifest.entries.len(); - let cache_dir = cache_dir.unwrap_or_else(|| { - default_cache_dir().unwrap_or_else(|_| PathBuf::from(".cache/fetchfs")) - }); - let cache = Cache::new(cache_dir.clone()); - if let Err(err) = cache.evict(cache_max_bytes, cache_max_age_days) { + let cache_base = match resolve_cache_dir(cache_dir) { + Some(dir) => dir, + None => return ExitCode::FAILURE, + }; + + mount_cache::cleanup_orphaned_mounts(&cache_base); + + let instance_dir = match mount_cache::create_instance_dir(&cache_base) { + Ok(dir) => dir, + Err(err) => { + error!("failed to create instance cache dir: {err}"); + return ExitCode::FAILURE; + } + }; + + let lru_tracker = Arc::new(LruTracker::new(cache_max_bytes)); + let cache = Cache::new(instance_dir.clone(), lru_tracker); + if let Err(err) = cache.evict(None, cache_max_age_days) { error!("failed to evict cache entries: {err}"); } let block_cache = block_cache_size - .map(|size| crate::cache::BlockCache::new(cache_dir.join("blocks"), size)); + .map(|size| crate::cache::BlockCache::new(instance_dir.join("blocks"), size)); let downloader = match HttpClient::new( retries, retry_base_ms, @@ -174,19 +189,28 @@ fn main() -> ExitCode { v.join(",") }); let filter = trace_filter.map(|f| { - f.split(',').map(|s| s.trim().to_string()).collect::>() + f.split(',') + .map(|s| s.trim().to_string()) + .collect::>() }); match SyscallTracer::new(&socket_path, filter) { Ok(tracer) => { if let Some(f) = filter_log { - info!("syscall tracing enabled: {} (filter: {})", socket_path.display(), f); + info!( + "syscall tracing enabled: {} (filter: {})", + socket_path.display(), + f + ); } else { info!("syscall tracing enabled: {}", socket_path.display()); } Some(tracer) } Err(err) => { - error!("failed to create syscall tracer at {}: {err}", socket_path.display()); + error!( + "failed to create syscall tracer at {}: {err}", + socket_path.display() + ); return ExitCode::FAILURE; } } @@ -200,25 +224,28 @@ fn main() -> ExitCode { "ready to mount {} files at {} (cache: {})", file_count, mountpoint.display(), - cache_dir.display() + instance_dir.display() ); - if let Err(err) = install_signal_handler(&mountpoint) { + if let Err(err) = install_signal_handler(&mountpoint, &instance_dir) { error!("failed to install signal handler: {err}"); } if let Err(err) = fuser::mount2(fuse_fs, &mountpoint, &mount_options) { error!("failed to mount: {err}"); + mount_cache::remove_instance_dir(&instance_dir); return ExitCode::FAILURE; } + mount_cache::remove_instance_dir(&instance_dir); ExitCode::SUCCESS } Commands::Clean { cache_dir } => { init_logging(false); - let cache_dir = cache_dir.unwrap_or_else(|| { - default_cache_dir().unwrap_or_else(|_| PathBuf::from(".cache/fetchfs")) - }); - let cache = Cache::new(cache_dir); + let cache_dir = match resolve_cache_dir(cache_dir) { + Some(dir) => dir, + None => return ExitCode::FAILURE, + }; + let cache = Cache::new(cache_dir, Arc::new(LruTracker::new(u64::MAX))); if let Err(err) = cache.clean() { error!("failed to clean cache: {err}"); ExitCode::FAILURE @@ -229,10 +256,11 @@ fn main() -> ExitCode { } Commands::Stats { cache_dir } => { init_logging(false); - let cache_dir = cache_dir.unwrap_or_else(|| { - default_cache_dir().unwrap_or_else(|_| PathBuf::from(".cache/fetchfs")) - }); - let cache = Cache::new(cache_dir.clone()); + let cache_dir = match resolve_cache_dir(cache_dir) { + Some(dir) => dir, + None => return ExitCode::FAILURE, + }; + let cache = Cache::new(cache_dir.clone(), Arc::new(LruTracker::new(u64::MAX))); match cache.stats() { Ok(stats) => { info!( @@ -252,6 +280,19 @@ fn main() -> ExitCode { } } +fn resolve_cache_dir(explicit: Option) -> Option { + if let Some(dir) = explicit { + return Some(dir); + } + match default_cache_dir() { + Ok(dir) => Some(dir), + Err(err) => { + error!("cannot determine cache directory: {err}"); + None + } + } +} + fn init_logging(verbose: bool) { let filter = if verbose { EnvFilter::new("debug") @@ -264,16 +305,19 @@ fn init_logging(verbose: bool) { .try_init(); } -fn install_signal_handler(mountpoint: &Path) -> Result<(), std::io::Error> { +fn install_signal_handler(mountpoint: &Path, instance_dir: &Path) -> Result<(), std::io::Error> { use signal_hook::consts::signal::{SIGINT, SIGTERM}; let mut signals = signal_hook::iterator::Signals::new([SIGINT, SIGTERM])?; let mountpoint = mountpoint.to_path_buf(); + let instance_dir = instance_dir.to_path_buf(); std::thread::spawn(move || { - if let Some(_signal) = signals.forever().next() - && let Err(err) = unmount_fs(&mountpoint) - && err.kind() != std::io::ErrorKind::InvalidInput - { - warn!("unmount failed: {err}"); + if let Some(_signal) = signals.forever().next() { + if let Err(err) = unmount_fs(&mountpoint) + && err.kind() != std::io::ErrorKind::InvalidInput + { + warn!("unmount failed: {err}"); + } + mount_cache::remove_instance_dir(&instance_dir); } }); Ok(()) diff --git a/src/mount_cache.rs b/src/mount_cache.rs new file mode 100644 index 0000000..32bef44 --- /dev/null +++ b/src/mount_cache.rs @@ -0,0 +1,164 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use tracing::{info, warn}; + +const MOUNTS_DIR: &str = "mounts"; +const PID_LOCK: &str = "pid.lock"; + +/// Create a new per-mount instance directory under `base/mounts//`. +/// Writes the current PID to `pid.lock` inside the directory. +/// Returns the instance directory path. +pub fn create_instance_dir(base: &Path) -> io::Result { + let id = generate_id(); + let instance = base.join(MOUNTS_DIR).join(id); + fs::create_dir_all(&instance)?; + + let pid = std::process::id(); + fs::write(instance.join(PID_LOCK), pid.to_string())?; + + Ok(instance) +} + +/// Remove the instance directory and all its contents. +pub fn remove_instance_dir(instance_dir: &Path) { + if instance_dir.exists() { + if let Err(err) = fs::remove_dir_all(instance_dir) { + warn!( + "failed to remove instance cache dir {:?}: {err}", + instance_dir + ); + } else { + info!("removed instance cache dir {:?}", instance_dir); + } + } +} + +/// Scan `base/mounts/` for instance directories whose owning process is no +/// longer alive. Removes any orphaned directories. +pub fn cleanup_orphaned_mounts(base: &Path) { + let mounts = base.join(MOUNTS_DIR); + let entries = match fs::read_dir(&mounts) { + Ok(entries) => entries, + Err(err) if err.kind() == io::ErrorKind::NotFound => return, + Err(err) => { + warn!("failed to read mounts dir {:?}: {err}", mounts); + return; + } + }; + + for entry in entries { + let entry = match entry { + Ok(e) => e, + Err(err) => { + warn!("failed to read mount dir entry: {err}"); + continue; + } + }; + let path = entry.path(); + if !path.is_dir() { + continue; + } + let pid_file = path.join(PID_LOCK); + let pid = match fs::read_to_string(&pid_file) { + Ok(content) => content, + Err(err) if err.kind() == io::ErrorKind::NotFound => { + warn!("no pid.lock in {:?}, removing", path); + remove_instance_dir(&path); + continue; + } + Err(err) => { + warn!("failed to read {:?}: {err}, skipping", pid_file); + continue; + } + }; + let pid: u32 = match pid.trim().parse() { + Ok(pid) => pid, + Err(err) => { + warn!("corrupt pid.lock in {:?}: {err}, removing", path); + remove_instance_dir(&path); + continue; + } + }; + if !is_process_alive(pid) { + info!("cleaning up orphaned mount cache (pid {pid}): {:?}", path); + remove_instance_dir(&path); + } + } +} + +fn is_process_alive(pid: u32) -> bool { + // SAFETY: kill with signal 0 only checks if process exists, no signal is sent. + let ret = unsafe { libc::kill(pid as i32, 0) }; + if ret == 0 { + return true; + } + // EPERM means the process exists but belongs to another user. + // Only ESRCH means the process is truly gone. + std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH) +} + +fn generate_id() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let pid = std::process::id(); + format!("{pid}-{ts}") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cleanup_orphaned_mounts_removes_dead_pid_dirs() { + let tmp = tempfile::tempdir().expect("tempdir"); + let mounts = tmp.path().join(MOUNTS_DIR); + let orphan = mounts.join("dead-instance"); + fs::create_dir_all(&orphan).expect("mkdir"); + fs::write(orphan.join(PID_LOCK), "999999999").expect("write pid"); + + cleanup_orphaned_mounts(tmp.path()); + + assert!(!orphan.exists()); + } + + #[test] + fn cleanup_orphaned_mounts_preserves_live_pid_dirs() { + let tmp = tempfile::tempdir().expect("tempdir"); + let mounts = tmp.path().join(MOUNTS_DIR); + let live = mounts.join("live-instance"); + fs::create_dir_all(&live).expect("mkdir"); + let pid = std::process::id(); + fs::write(live.join(PID_LOCK), pid.to_string()).expect("write pid"); + + cleanup_orphaned_mounts(tmp.path()); + + assert!(live.exists()); + } + + #[test] + fn create_instance_dir_writes_pid_lock() { + let tmp = tempfile::tempdir().expect("tempdir"); + let instance = create_instance_dir(tmp.path()).expect("create"); + + assert!(instance.exists()); + let pid_content = fs::read_to_string(instance.join(PID_LOCK)).expect("read pid"); + assert_eq!(pid_content, std::process::id().to_string()); + } + + #[test] + fn remove_instance_dir_cleans_up() { + let tmp = tempfile::tempdir().expect("tempdir"); + let instance = create_instance_dir(tmp.path()).expect("create"); + fs::write(instance.join("some_cached_file"), b"data").expect("write"); + + remove_instance_dir(&instance); + + assert!(!instance.exists()); + } +} diff --git a/src/syscall_trace.rs b/src/syscall_trace.rs index b2bf7e5..3fbc632 100644 --- a/src/syscall_trace.rs +++ b/src/syscall_trace.rs @@ -100,7 +100,10 @@ pub struct SyscallTracer { } impl SyscallTracer { - pub fn new>(socket_path: P, filter: Option>) -> std::io::Result { + pub fn new>( + socket_path: P, + filter: Option>, + ) -> std::io::Result { let socket = UnixDatagram::unbound()?; socket.connect(socket_path)?; socket.set_nonblocking(true)?; @@ -115,7 +118,11 @@ impl SyscallTracer { } pub fn trace(&self, syscall: &'static str, args: SyscallArgs<'_>) { - if self.filter.as_ref().is_some_and(|filter| !filter.contains(syscall)) { + if self + .filter + .as_ref() + .is_some_and(|filter| !filter.contains(syscall)) + { return; } let event = SyscallEvent { @@ -129,7 +136,14 @@ impl SyscallTracer { } pub fn lookup(&self, parent: u64, name: &str, parent_path: Option<&str>) { - self.trace("lookup", SyscallArgs::Lookup { parent, name, parent_path }); + self.trace( + "lookup", + SyscallArgs::Lookup { + parent, + name, + parent_path, + }, + ); } pub fn getattr(&self, ino: u64, fh: Option, path: Option<&str>) { @@ -141,7 +155,15 @@ impl SyscallTracer { } pub fn getxattr(&self, ino: u64, name: &str, size: u32, path: Option<&str>) { - self.trace("getxattr", SyscallArgs::Getxattr { ino, name, size, path }); + self.trace( + "getxattr", + SyscallArgs::Getxattr { + ino, + name, + size, + path, + }, + ); } pub fn listxattr(&self, ino: u64, size: u32, path: Option<&str>) { @@ -149,7 +171,15 @@ impl SyscallTracer { } pub fn readdir(&self, ino: u64, fh: u64, offset: i64, path: Option<&str>) { - self.trace("readdir", SyscallArgs::Readdir { ino, fh, offset, path }); + self.trace( + "readdir", + SyscallArgs::Readdir { + ino, + fh, + offset, + path, + }, + ); } pub fn open(&self, ino: u64, flags: i32, path: Option<&str>) { @@ -157,18 +187,54 @@ impl SyscallTracer { } pub fn read(&self, ino: u64, fh: u64, offset: i64, size: u32, path: Option<&str>) { - self.trace("read", SyscallArgs::Read { ino, fh, offset, size, path }); + self.trace( + "read", + SyscallArgs::Read { + ino, + fh, + offset, + size, + path, + }, + ); } pub fn release(&self, ino: u64, fh: u64, flags: i32, flush: bool, path: Option<&str>) { - self.trace("release", SyscallArgs::Release { ino, fh, flags, flush, path }); + self.trace( + "release", + SyscallArgs::Release { + ino, + fh, + flags, + flush, + path, + }, + ); } pub fn flush(&self, ino: u64, fh: u64, lock_owner: u64, path: Option<&str>) { - self.trace("flush", SyscallArgs::Flush { ino, fh, lock_owner, path }); + self.trace( + "flush", + SyscallArgs::Flush { + ino, + fh, + lock_owner, + path, + }, + ); } pub fn write(&self, ino: u64, fh: u64, offset: i64, size: u32, flags: i32, path: Option<&str>) { - self.trace("write", SyscallArgs::Write { ino, fh, offset, size, flags, path }); + self.trace( + "write", + SyscallArgs::Write { + ino, + fh, + offset, + size, + flags, + path, + }, + ); } }