From d3a791abe7d1a15f17729be9bd5f78494d700af2 Mon Sep 17 00:00:00 2001 From: Martin Pool Date: Sat, 23 May 2026 17:49:02 -0700 Subject: [PATCH] Improved Monitor interface * Send stdout println through Monitor so it can manage progress bar synchronization internally * Pass around just `Monitor` and hold an Arc internally, like `Transport` already does * impl Clone for Counters so tests can get a copy of them * pass Monitor into a couple more places --- doc/design.md | 3 +- src/archive.rs | 25 +++---- src/backup.rs | 154 +++++++++++++++++--------------------- src/band.rs | 10 +-- src/bin/conserve.rs | 50 +++++++------ src/blockdir.rs | 75 +++++++++---------- src/counters.rs | 11 +++ src/diff.rs | 20 +++-- src/gc_lock.rs | 8 +- src/index/mod.rs | 10 +-- src/index/stitch.rs | 42 +++++------ src/index/write.rs | 6 +- src/merge.rs | 2 +- src/monitor.rs | 98 ++++++++++++++++++++++++ src/monitor/mod.rs | 25 ------- src/monitor/task.rs | 2 +- src/monitor/test.rs | 30 +++++--- src/monitor/void.rs | 51 ++++++++++--- src/restore.rs | 71 ++++++++---------- src/show.rs | 8 +- src/source.rs | 16 ++-- src/stored_tree.rs | 17 ++--- src/termui/monitor.rs | 33 ++++++-- src/termui/trace.rs | 6 +- src/test_fixtures.rs | 6 +- src/validate.rs | 5 +- tests/damage.rs | 26 +++---- tests/mount.rs | 10 +-- tests/old_archives.rs | 23 +++--- tests/proptest_changes.rs | 6 +- 30 files changed, 479 insertions(+), 370 deletions(-) create mode 100644 src/monitor.rs delete mode 100644 src/monitor/mod.rs diff --git a/doc/design.md b/doc/design.md index a1f8ad1f..dc06fea4 100644 --- a/doc/design.md +++ b/doc/design.md @@ -180,8 +180,7 @@ The library should support several modes of UI: in a limited way. (Tests that run the `conserve` binary as a subprocess have more freedom, including running it on a pseudoterminal.) -Conserve writes messages to Rust's widely-used `tracing` crate. Logs can be written to a file with `--log-json`. -well as to the terminal, and at a different level of detail. +Conserve writes messages to Rust's widely-used `tracing` crate. Logs can be written to a file with `--log-json` or `--trace-tmp`, as well as to the terminal, and at a different level of detail. - The library will emit logs but will not by default configure any log targets, so that applications can choose the target they want. diff --git a/src/archive.rs b/src/archive.rs index 25f24497..6a4791ac 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -138,7 +138,7 @@ impl Archive { band_selection: BandSelectionPolicy, subtree: Apath, exclude: Exclude, - monitor: Arc, + monitor: Monitor, ) -> Result { Ok(self .open_stored_tree(band_selection) @@ -205,7 +205,7 @@ impl Archive { pub async fn referenced_blocks( &self, band_ids: &[BandId], - monitor: Arc, + monitor: Monitor, ) -> Result> { let archive = self.clone(); let task = monitor.start_task("Find referenced blocks".to_string()); @@ -224,7 +224,7 @@ impl Archive { } /// Returns an iterator of blocks that are present and referenced by no index. - pub async fn unreferenced_blocks(&self, monitor: Arc) -> Result> { + pub async fn unreferenced_blocks(&self, monitor: Monitor) -> Result> { let referenced = self .referenced_blocks(&self.list_band_ids().await?, monitor.clone()) .await?; @@ -245,7 +245,7 @@ impl Archive { &self, delete_band_ids: &[BandId], options: &DeleteOptions, - monitor: Arc, + monitor: Monitor, ) -> Result { let mut stats = DeleteStats::default(); let start = Instant::now(); @@ -323,11 +323,7 @@ impl Archive { /// If problems are found, they are emitted as `warn` or `error` level /// tracing messages. This function only returns an error if validation /// stops due to a fatal error. - pub async fn validate( - &self, - options: &ValidateOptions, - monitor: Arc, - ) -> Result<()> { + pub async fn validate(&self, options: &ValidateOptions, monitor: Monitor) -> Result<()> { self.validate_archive_dir(monitor.clone()).await?; debug!("List bands..."); @@ -373,7 +369,7 @@ impl Archive { Ok(()) } - async fn validate_archive_dir(&self, monitor: Arc) -> Result<()> { + async fn validate_archive_dir(&self, monitor: Monitor) -> Result<()> { // TODO: More tests for the problems detected here. debug!("Check archive directory..."); let mut seen_bands = HashSet::::new(); @@ -417,7 +413,6 @@ mod test { use assert_fs::TempDir; use assert_fs::prelude::*; - use crate::monitor::test::TestMonitor; use crate::test_fixtures::TreeFixture; use crate::test_fixtures::store_two_versions; @@ -481,7 +476,7 @@ mod test { "Archive should have no bands yet" ); assert_eq!( - af.referenced_blocks(&af.list_band_ids().await.unwrap(), TestMonitor::arc()) + af.referenced_blocks(&af.list_band_ids().await.unwrap(), Monitor::void()) .await .unwrap() .len(), @@ -500,7 +495,7 @@ mod test { .delete_bands( &[BandId::new(&[0]), BandId::new(&[1])], &Default::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .expect("delete_bands"); @@ -524,14 +519,14 @@ mod test { &archive, tf.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .expect("backup"); // Delete the band and index std::fs::remove_dir_all(archive.transport().local_path().unwrap().join("b0000")).unwrap(); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let unreferenced: Vec = archive.unreferenced_blocks(monitor.clone()).await.unwrap(); diff --git a/src/backup.rs b/src/backup.rs index afbde587..42a326f0 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -88,7 +88,7 @@ pub async fn backup( archive: &Archive, source_path: &Path, options: &BackupOptions, - monitor: Arc, + monitor: Monitor, ) -> Result { let start = Instant::now(); if gc_lock::GarbageCollectionLock::is_locked(archive).await? { @@ -201,7 +201,7 @@ struct BackupWriter { } impl BackupWriter { - async fn finish(mut self, monitor: Arc) -> Result { + async fn finish(mut self, monitor: Monitor) -> Result { self.flush_group(monitor.clone()).await?; let hunks = self.index_writer.finish().await?; trace!(?hunks, "Closing band"); @@ -210,7 +210,7 @@ impl BackupWriter { } /// Write out any pending data blocks, and then the pending index entries. - async fn flush_group(&mut self, monitor: Arc) -> Result<()> { + async fn flush_group(&mut self, monitor: Monitor) -> Result<()> { let (stats, mut entries) = self.file_combiner.drain(monitor.clone()).await?; trace!("Got {} entries to write from file combiner", entries.len()); self.stats += stats; @@ -230,7 +230,7 @@ impl BackupWriter { mut source_entry: source::Entry, source_tree: &SourceTree, options: &BackupOptions, - monitor: Arc, + monitor: Monitor, ) -> Result> { if !options.owner { source_entry.owner.clear(); @@ -238,7 +238,7 @@ impl BackupWriter { // TODO: Emit deletions for entries in the basis not present in the source, // probably by using Merge to read both trees in parallel. match source_entry.kind() { - Kind::Dir => self.copy_dir(&source_entry, monitor.as_ref()), + Kind::Dir => self.copy_dir(&source_entry, &monitor), Kind::File => { self.copy_file( &source_entry, @@ -249,7 +249,7 @@ impl BackupWriter { ) .await } - Kind::Symlink => self.copy_symlink(&source_entry, monitor.as_ref()), + Kind::Symlink => self.copy_symlink(&source_entry, &monitor), Kind::Unknown => { self.stats.unknown_kind += 1; // TODO: Perhaps eventually we could backup and restore pipes, @@ -263,7 +263,7 @@ impl BackupWriter { fn copy_dir( &mut self, source_entry: &source::Entry, - monitor: &dyn Monitor, + monitor: &Monitor, ) -> Result> { monitor.count(Counter::Dirs, 1); self.stats.directories += 1; @@ -279,7 +279,7 @@ impl BackupWriter { source_tree: &SourceTree, basis_entry: &Option, options: &BackupOptions, - monitor: Arc, + monitor: Monitor, ) -> Result> { self.stats.files += 1; monitor.count(Counter::Files, 1); @@ -356,7 +356,7 @@ impl BackupWriter { fn copy_symlink( &mut self, source_entry: &source::Entry, - monitor: &dyn Monitor, + monitor: &Monitor, ) -> Result> { monitor.count(Counter::Symlinks, 1); let target = source_entry.symlink_target(); @@ -375,7 +375,7 @@ async fn store_file_content( block_dir: &BlockDir, stats: &mut BackupStats, max_block_size: usize, - monitor: Arc, + monitor: Monitor, ) -> Result> { let mut addresses = Vec::
::with_capacity(1); loop { @@ -463,7 +463,7 @@ impl FileCombiner { /// Flush any pending files, and return accumulated file entries and stats. /// The FileCombiner is then empty and ready for reuse. - async fn drain(&mut self, monitor: Arc) -> Result<(BackupStats, Vec)> { + async fn drain(&mut self, monitor: Monitor) -> Result<(BackupStats, Vec)> { self.flush(monitor).await?; debug_assert!(self.queue.is_empty()); debug_assert!(self.buf.is_empty()); @@ -479,7 +479,7 @@ impl FileCombiner { /// /// After this call the FileCombiner is empty and can be reused for more files into a new /// block. - async fn flush(&mut self, monitor: Arc) -> Result<()> { + async fn flush(&mut self, monitor: Monitor) -> Result<()> { if self.queue.is_empty() { debug_assert!(self.buf.is_empty()); return Ok(()); @@ -508,7 +508,7 @@ impl FileCombiner { &mut self, entry: &source::Entry, from_file: &mut dyn Read, - monitor: Arc, + monitor: Monitor, ) -> Result<()> { let start = self.buf.len(); let expected_len: usize = entry @@ -662,7 +662,7 @@ mod test { use filetime::{FileTime, set_file_mtime}; use crate::counters::Counter; - use crate::monitor::test::TestMonitor; + use crate::monitor::Monitor; use crate::test_fixtures::TreeFixture; use crate::transport::Transport; use crate::transport::record::Verb; @@ -676,7 +676,7 @@ mod test { let archive = Archive::create_temp().await; let src = TempDir::new().unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); src.child("a").touch().unwrap(); @@ -706,7 +706,7 @@ mod test { .unwrap(); assert_eq!(stats2.files, 0); - assert_eq!(monitor.get_counter(Counter::EntriesDeleted), 1); + assert_eq!(collector.get_counter(Counter::EntriesDeleted), 1); assert_eq!( changes.lock().unwrap().len(), 1, @@ -732,7 +732,7 @@ mod test { let srcdir = TreeFixture::new(); srcdir.create_file("hello"); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let backup_stats = backup( &archive, srcdir.path(), @@ -741,7 +741,7 @@ mod test { ) .await .expect("backup"); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 1); + assert_eq!(collector.get_counter(Counter::IndexWrites), 1); assert_eq!(backup_stats.files, 1); assert_eq!(backup_stats.deduplicated_blocks, 0); assert_eq!(backup_stats.written_blocks, 1); @@ -764,7 +764,7 @@ mod test { .await .expect("restore"); - monitor.assert_counter(Counter::FileBytes, 8); + collector.assert_counter(Counter::FileBytes, 8); Ok(()) } @@ -782,16 +782,16 @@ mod test { exclude, ..BackupOptions::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let stats = backup(&af, srcdir.path(), &options, monitor.clone()) .await .expect("backup"); check_backup(&af).await?; - let counters = monitor.counters(); + let counters = collector.counters(); dbg!(counters); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 1); + assert_eq!(collector.get_counter(Counter::IndexWrites), 1); assert_eq!(stats.files, 1); // TODO: Check stats for the number of excluded entries. assert!(counters.get(Counter::IndexWriteCompressedBytes) > 100); @@ -808,7 +808,7 @@ mod test { assert!(band_info.is_closed); assert!(band_info.end_time.is_some()); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore( &archive, restore_dir.path(), @@ -817,12 +817,12 @@ mod test { ) .await .expect("restore"); - monitor.assert_counter(Counter::FileBytes, 8); + collector.assert_counter(Counter::FileBytes, 8); // TODO: Read back contents of that file. // TODO: Check index stats. // TODO: Check what was restored. - af.validate(&ValidateOptions::default(), Arc::new(TestMonitor::new())) + af.validate(&ValidateOptions::default(), Monitor::void()) .await .unwrap(); // TODO: Maybe check there were no errors or warnings. @@ -848,7 +848,7 @@ mod test { exclude, ..Default::default() }; - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .expect("backup"); @@ -892,7 +892,7 @@ mod test { assert_eq!( archive - .referenced_blocks(&archive.list_band_ids().await.unwrap(), TestMonitor::arc()) + .referenced_blocks(&archive.list_band_ids().await.unwrap(), Monitor::void()) .await .unwrap() .into_iter() @@ -912,7 +912,7 @@ mod test { ); assert_eq!( archive - .unreferenced_blocks(TestMonitor::arc()) + .unreferenced_blocks(Monitor::void()) .await .unwrap() .len(), @@ -930,7 +930,7 @@ mod test { let large_content = vec![b'a'; file_size]; tf.create_file_with_contents("large", &large_content); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let backup_stats = backup( &af, tf.path(), @@ -949,12 +949,12 @@ mod test { assert_eq!(backup_stats.deduplicated_blocks, 3); assert_eq!(backup_stats.deduplicated_bytes, 3 << 20); assert_eq!(backup_stats.errors, 0); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 1); + assert_eq!(collector.get_counter(Counter::IndexWrites), 1); // Try to restore it let rd = TempDir::new().unwrap(); let restore_archive = Archive::open(af.transport().clone()).await.unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore( &restore_archive, rd.path(), @@ -963,9 +963,9 @@ mod test { ) .await .expect("restore"); - monitor.assert_no_errors(); - monitor.assert_counter(Counter::Files, 1); - monitor.assert_counter(Counter::FileBytes, file_size); + collector.assert_no_errors(); + collector.assert_counter(Counter::Files, 1); + collector.assert_counter(Counter::FileBytes, file_size); let content = std::fs::read(rd.path().join("large")).unwrap(); assert_eq!(large_content, content); @@ -984,14 +984,9 @@ mod test { tf.make_file_unreadable("b_unreadable"); - let stats = backup( - &af, - tf.path(), - &BackupOptions::default(), - TestMonitor::arc(), - ) - .await - .expect("backup"); + let stats = backup(&af, tf.path(), &BackupOptions::default(), Monitor::void()) + .await + .expect("backup"); assert_eq!(stats.errors, 1); assert_eq!(stats.new_files, 3); assert_eq!(stats.files, 3); @@ -1012,7 +1007,7 @@ mod test { set_file_mtime(file_path, t1969).expect("Failed to set file times"); let lt = SourceTree::open(tf.path()).unwrap(); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let entries = lt .iter_entries(Apath::root(), Exclude::nothing(), monitor.clone()) .unwrap() @@ -1022,14 +1017,9 @@ mod test { assert_eq!(entries[1].apath(), "/old_file"); let af = Archive::create_temp().await; - backup( - &af, - tf.path(), - &BackupOptions::default(), - TestMonitor::arc(), - ) - .await - .expect("backup shouldn't crash on before-epoch mtimes"); + backup(&af, tf.path(), &BackupOptions::default(), Monitor::void()) + .await + .expect("backup shouldn't crash on before-epoch mtimes"); } #[tokio::test] @@ -1043,7 +1033,7 @@ mod test { &af, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .expect("backup"); @@ -1083,7 +1073,7 @@ mod test { &af, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .unwrap(); @@ -1097,7 +1087,7 @@ mod test { .await .unwrap(); let entries = st - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .collect_all() .await .unwrap(); @@ -1109,14 +1099,9 @@ mod test { // Restore it let dest = TempDir::new().unwrap(); - restore( - &af, - dest.path(), - RestoreOptions::default(), - TestMonitor::arc(), - ) - .await - .expect("restore"); + restore(&af, dest.path(), RestoreOptions::default(), Monitor::void()) + .await + .expect("restore"); // TODO: Check restore stats. dest.child("empty").assert(""); } @@ -1129,7 +1114,7 @@ mod test { srcdir.create_file("bbb"); let options = BackupOptions::default(); - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .unwrap(); @@ -1139,7 +1124,7 @@ mod test { // Make a second backup from the same tree, and we should see that // both files are unmodified. - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .unwrap(); @@ -1151,7 +1136,7 @@ mod test { // as unmodified. srcdir.create_file_with_contents("bbb", b"longer content for bbb"); - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .unwrap(); @@ -1169,7 +1154,7 @@ mod test { srcdir.create_file_with_contents("bbb", b"longer content for bbb"); let options = BackupOptions::default(); - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .unwrap(); @@ -1193,7 +1178,7 @@ mod test { } } - let stats = backup(&af, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&af, srcdir.path(), &options, Monitor::void()) .await .unwrap(); assert_eq!(stats.files, 2); @@ -1211,7 +1196,7 @@ mod test { &af, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .unwrap(); @@ -1230,7 +1215,7 @@ mod test { &af, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .unwrap(); @@ -1260,12 +1245,12 @@ mod test { max_entries_per_hunk: 1000, ..Default::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let stats = backup(&af, srcdir.path(), &backup_options, monitor.clone()) .await .expect("backup"); assert_eq!( - monitor.get_counter(Counter::IndexWrites), + collector.get_counter(Counter::IndexWrites), 2, "expect exactly 2 hunks" ); @@ -1285,7 +1270,7 @@ mod test { .await .unwrap(); let entries = tree - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .collect_all() .await .unwrap(); @@ -1317,12 +1302,12 @@ mod test { small_file_cap: 100_000, ..Default::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let stats = backup(&af, srcdir.path(), &backup_options, monitor.clone()) .await .expect("backup"); assert_eq!( - monitor.get_counter(Counter::IndexWrites), + collector.get_counter(Counter::IndexWrites), 2, "expect exactly 2 hunks" ); @@ -1342,7 +1327,7 @@ mod test { .await .unwrap(); let entries = tree - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .collect_all() .await .unwrap(); @@ -1360,7 +1345,7 @@ mod test { srcdir.create_file("a"); srcdir.create_file("b"); // Use small hunks for easier manipulation. - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let stats = backup( &af, srcdir.path(), @@ -1374,10 +1359,10 @@ mod test { .unwrap(); assert_eq!(stats.new_files, 2); assert_eq!(stats.small_combined_files, 2); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 3,); + assert_eq!(collector.get_counter(Counter::IndexWrites), 3,); // Make a second backup, with the first file changed. - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); srcdir.create_file_with_contents("a", b"new a contents"); let stats = backup( &af, @@ -1392,7 +1377,7 @@ mod test { .unwrap(); assert_eq!(stats.unmodified_files, 1); assert_eq!(stats.modified_files, 1); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 3,); + assert_eq!(collector.get_counter(Counter::IndexWrites), 3,); // Delete the last hunk and reopen the last band. af.transport().remove_file("b0001/BANDTAIL").await.unwrap(); @@ -1403,7 +1388,7 @@ mod test { // The third backup should see nothing changed, by looking at the stitched // index from both b0 and b1. - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let stats = backup( &af, srcdir.path(), @@ -1416,7 +1401,7 @@ mod test { .await .unwrap(); assert_eq!(stats.unmodified_files, 2, "both files are unmodified"); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 3); + assert_eq!(collector.get_counter(Counter::IndexWrites), 3); } #[tokio::test] @@ -1428,7 +1413,8 @@ mod test { srcdir.create_file("bbb"); let options = BackupOptions::default(); - let stats = backup(&archive, srcdir.path(), &options, TestMonitor::arc()) + let (monitor, _collector) = Monitor::for_test(); + let stats = backup(&archive, srcdir.path(), &options, monitor) .await .unwrap(); @@ -1441,7 +1427,7 @@ mod test { // Reopen the archive to avoid cache effects. let archive = Archive::open(transport.clone()).await.unwrap(); // Make a second backup from the same tree, and we should see that both files are unmodified. - let stats = backup(&archive, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&archive, srcdir.path(), &options, Monitor::void()) .await .unwrap(); @@ -1478,7 +1464,7 @@ mod test { // as unmodified. let archive = Archive::open(transport.clone()).await.unwrap(); srcdir.create_file_with_contents("bbb", b"longer content for bbb"); - let stats = backup(&archive, srcdir.path(), &options, TestMonitor::arc()) + let stats = backup(&archive, srcdir.path(), &options, Monitor::void()) .await .unwrap(); diff --git a/src/band.rs b/src/band.rs index 0e484d17..fe841fab 100644 --- a/src/band.rs +++ b/src/band.rs @@ -22,9 +22,7 @@ //! StoredTree rather than the Band itself. use std::borrow::Cow; -use std::sync::Arc; -use crate::transport::Transport; use itertools::Itertools; use jiff::Timestamp; use serde::{Deserialize, Serialize}; @@ -32,6 +30,7 @@ use tracing::{debug, trace, warn}; use crate::jsonio::{read_json, write_json}; use crate::monitor::Monitor; +use crate::transport::Transport; use crate::*; static INDEX_DIR: &str = "i"; @@ -254,7 +253,7 @@ impl Band { &self.head.format_flags } - pub fn index_writer(&self, monitor: Arc) -> IndexWriter { + pub fn index_writer(&self, monitor: Monitor) -> IndexWriter { IndexWriter::new(self.transport.chdir(INDEX_DIR), monitor) } @@ -287,7 +286,7 @@ impl Band { }) } - pub async fn validate(&self, monitor: Arc) -> Result<()> { + pub async fn validate(&self, monitor: Monitor) -> Result<()> { let entries = self.transport.list_dir("").await?; if !entries.iter().any(|entry| entry.name == BAND_HEAD_FILENAME) { monitor.error(Error::BandHeadMissing { @@ -311,7 +310,6 @@ mod tests { use serde_json::json; - use crate::monitor::test::TestMonitor; use crate::transport::WriteMode; use crate::*; @@ -424,7 +422,7 @@ mod tests { assert_eq!( archive - .referenced_blocks(&archive.list_band_ids().await.unwrap(), TestMonitor::arc()) + .referenced_blocks(&archive.list_band_ids().await.unwrap(), Monitor::void()) .await .unwrap() .len(), diff --git a/src/bin/conserve.rs b/src/bin/conserve.rs index 1ec267a5..ea04035f 100644 --- a/src/bin/conserve.rs +++ b/src/bin/conserve.rs @@ -23,6 +23,7 @@ use std::time::Instant; use clap::builder::{Styles, styling}; use clap::{Parser, Subcommand}; use conserve::change::Change; +use conserve::monitor::Monitor; #[allow(unused_imports)] use tracing::{Level, debug, error, info, trace, warn}; @@ -334,7 +335,7 @@ impl std::process::Termination for ExitCode { impl Command { #[tokio::main] - async fn run(&self, monitor: Arc) -> Result { + async fn run(&self, monitor: Monitor) -> Result { let mut stdout = io::stdout(); match self { Command::Backup { @@ -353,6 +354,7 @@ impl Command { *verbose, *long_listing, &changes_json.as_deref(), + monitor.clone(), )?, ..Default::default() }; @@ -421,8 +423,7 @@ impl Command { ) .await?; if !no_stats { - monitor.clear_progress_bars(); - println!("{stats}"); + monitor.println(&stats.to_string()); } } Command::Diff { @@ -494,9 +495,9 @@ impl Command { // be different, or these should be a specific method to produce // this json format...? if *json { - println!("{}", entry.listing_json()); + monitor.println(&entry.listing_json().to_string()); } else { - println!("{}", entry.format_ls(*long_listing)); + monitor.println(&entry.format_ls(*long_listing)); } } } else { @@ -507,14 +508,14 @@ impl Command { monitor.clone(), )?; for entry in entry_iter { - if *json { - println!("{}", entry.listing_json()); + let line = if *json { + entry.listing_json().to_string() } else { - println!("{}", entry.format_ls(*long_listing)); - } + entry.format_ls(*long_listing) + }; + monitor.println(&line); } }; - monitor.clear_progress_bars(); } #[cfg(windows)] Command::Mount { @@ -577,6 +578,7 @@ impl Command { *verbose, *long_listing, &changes_json.as_deref(), + monitor.clone(), )?, inject_failures: Default::default(), }; @@ -601,12 +603,12 @@ impl Command { .size(exclude, monitor.clone())? .file_bytes }; - monitor.clear_progress_bars(); - if *bytes { - println!("{size}"); + let line = if *bytes { + size.to_string() } else { - println!("{}", conserve::bytes_to_human_mb(size)); - } + conserve::bytes_to_human_mb(size) + }; + monitor.println(&line); } Command::Validate { archive, quick, .. } => { let options = ValidateOptions { @@ -665,6 +667,7 @@ fn make_change_callback( print_changes: bool, ls_long: bool, changes_json: &Option<&Path>, + monitor: Monitor, ) -> Result> { if !print_changes && !ls_long && changes_json.is_none() { return Ok(None); @@ -687,15 +690,19 @@ fn make_change_callback( } if ls_long { let change_meta = entry_change.change.primary_metadata(); - println!( + monitor.println(&format!( "{} {} {} {}", entry_change.change.sigil(), change_meta.unix_mode, change_meta.owner, entry_change.apath - ); + )); } else if print_changes { - println!("{} {}", entry_change.change.sigil(), entry_change.apath); + monitor.println(&format!( + "{} {}", + entry_change.change.sigil(), + entry_change.apath + )); } if let Some(w) = &changes_json_writer { let mut w = w.borrow_mut(); @@ -717,9 +724,10 @@ fn main() -> Result { } else { Level::INFO }; - let monitor = Arc::new(TermUiMonitor::new(!args.no_progress)); + let term_ui_monitor = Arc::new(TermUiMonitor::new(!args.no_progress)); + let monitor = Monitor::new(term_ui_monitor.clone()); let _flush_tracing = enable_tracing( - &monitor, + &term_ui_monitor, &args.trace_time, console_level, &args.log_json, @@ -734,7 +742,7 @@ fn main() -> Result { .truncate(true) .write(true) .open(metrics_path)?, - monitor.counters(), + &monitor.get_counters(), )?; } match result { diff --git a/src/blockdir.rs b/src/blockdir.rs index f16f5311..f68fb55b 100644 --- a/src/blockdir.rs +++ b/src/blockdir.rs @@ -129,7 +129,7 @@ impl BlockDir { &self, block_data: Bytes, stats: &mut BackupStats, - monitor: Arc, + monitor: Monitor, ) -> Result { let hash = BlockHash::hash_bytes(&block_data); let uncomp_len = block_data.len() as u64; @@ -193,11 +193,7 @@ impl BlockDir { } /// Read back some content addressed by an [Address] (a block hash, start and end). - pub(crate) async fn read_address( - &self, - address: &Address, - monitor: Arc, - ) -> Result { + pub(crate) async fn read_address(&self, address: &Address, monitor: Monitor) -> Result { let bytes = self.get_block_content(&address.hash, monitor).await?; let len = address.len as usize; let start = address.start as usize; @@ -220,7 +216,7 @@ impl BlockDir { pub(crate) async fn get_block_content( &self, hash: &BlockHash, - monitor: Arc, + monitor: Monitor, ) -> Result { // TODO: Tokio locks on caches if let Some(hit) = self.cache.write().expect("Lock cache").get(hash) { @@ -272,10 +268,7 @@ impl BlockDir { /// /// Return a dict describing which blocks are present, and the length of their uncompressed /// data. - pub(crate) async fn validate( - &self, - monitor: Arc, - ) -> Result> { + pub(crate) async fn validate(&self, monitor: Monitor) -> Result> { // TODO: In the top-level directory, no files or directories other than prefix // directories of the right length. // TODO: Test having a block with the right compression but the wrong contents. @@ -318,7 +311,7 @@ impl BlockDir { async fn get_async_uncached( transport: &Transport, hash: BlockHash, - monitor: Arc, + monitor: Monitor, ) -> Result { let block_relpath = block_relpath(&hash); let compressed_bytes = transport.read(&block_relpath).await?; @@ -428,7 +421,7 @@ mod test { use pretty_assertions::assert_eq; use tempfile::TempDir; - use crate::{monitor::test::TestMonitor, transport::record::Verb}; + use crate::transport::record::Verb; use super::*; @@ -440,14 +433,15 @@ mod test { let transport = Transport::temp().enable_record_calls(); let blockdir = BlockDir::open(transport.clone()).await.unwrap(); let mut stats = BackupStats::default(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let content = Bytes::from("stuff"); let hash = blockdir .store_or_deduplicate(content.clone(), &mut stats, monitor.clone()) .await .unwrap(); - assert_eq!(monitor.get_counter(Counter::BlockWrites), 1); - assert_eq!(monitor.get_counter(Counter::DeduplicatedBlocks), 0); + assert_eq!(collector.get_counter(Counter::BlockWrites), 1); + assert_eq!(collector.get_counter(Counter::DeduplicatedBlocks), 0); assert!(blockdir.contains(&hash)); let recording = transport.take_recording(); dbg!(&recording); @@ -466,7 +460,7 @@ mod test { let _ = transport.take_recording(); // Open again to get a fresh cache - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let blockdir = BlockDir::open(transport.clone()).await.unwrap(); assert!(!blockdir.contains(&hash)); let recording = transport.take_recording(); @@ -492,8 +486,8 @@ mod test { .store_or_deduplicate(content.clone(), &mut stats, monitor.clone()) .await .unwrap(); - assert_eq!(monitor.get_counter(Counter::BlockWrites), 1); - assert_eq!(monitor.get_counter(Counter::DeduplicatedBlocks), 0); + assert_eq!(collector.get_counter(Counter::BlockWrites), 1); + assert_eq!(collector.get_counter(Counter::DeduplicatedBlocks), 0); assert!(blockdir.contains(&hash)); let recording = transport.take_recording(); assert_eq!( @@ -503,14 +497,15 @@ mod test { ); let blockdir = BlockDir::open(transport.clone()).await.unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let retrieved = blockdir .get_block_content(&hash, monitor.clone()) .await .unwrap(); assert_eq!(content, retrieved); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 0); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 1); + assert_eq!(collector.get_counter(Counter::BlockContentCacheHit), 0); + assert_eq!(collector.get_counter(Counter::BlockContentCacheMiss), 1); let recording = transport.take_recording(); assert_eq!( recording.verb_paths(Verb::Read).len(), @@ -524,14 +519,15 @@ mod test { let transport = Transport::temp().enable_record_calls(); let blockdir = BlockDir::open(transport.clone()).await.unwrap(); let mut stats = BackupStats::default(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let content = Bytes::from("stuff"); let hash = blockdir .store_or_deduplicate(content.clone(), &mut stats, monitor.clone()) .await .unwrap(); - assert_eq!(monitor.get_counter(Counter::BlockWrites), 1); - assert_eq!(monitor.get_counter(Counter::DeduplicatedBlocks), 0); + assert_eq!(collector.get_counter(Counter::BlockWrites), 1); + assert_eq!(collector.get_counter(Counter::DeduplicatedBlocks), 0); assert!(blockdir.contains(&hash)); let recording = transport.take_recording(); assert_eq!( @@ -547,7 +543,7 @@ mod test { // Open again to get a fresh cache let blockdir = BlockDir::open(transport.clone()).await.unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let recording = transport.take_recording(); assert_eq!( recording.verb_paths(Verb::ListDir).len(), @@ -559,8 +555,8 @@ mod test { .store_or_deduplicate(content.clone(), &mut stats, monitor.clone()) .await .unwrap(); - assert_eq!(monitor.get_counter(Counter::BlockWrites), 0); - assert_eq!(monitor.get_counter(Counter::DeduplicatedBlocks), 1); + assert_eq!(collector.get_counter(Counter::BlockWrites), 0); + assert_eq!(collector.get_counter(Counter::DeduplicatedBlocks), 1); let recording = transport.take_recording(); assert_eq!( recording.calls, @@ -576,7 +572,8 @@ mod test { .await .unwrap(); let mut stats = BackupStats::default(); - let monitor = TestMonitor::arc(); + + let (monitor, _collector) = Monitor::for_test(); assert_eq!(blockdir.blocks().len(), 0); @@ -615,7 +612,7 @@ mod test { let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); let hash = blockdir - .store_or_deduplicate(content.clone(), &mut stats, TestMonitor::arc()) + .store_or_deduplicate(content.clone(), &mut stats, Monitor::void()) .await .unwrap(); assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 0); @@ -623,14 +620,14 @@ mod test { assert!(blockdir.contains(&hash)); let _recording = transport.take_recording(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let retrieved = blockdir .get_block_content(&hash, monitor.clone()) .await .unwrap(); assert_eq!(content, retrieved); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 1); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 0); + assert_eq!(collector.get_counter(Counter::BlockContentCacheHit), 1); + assert_eq!(collector.get_counter(Counter::BlockContentCacheMiss), 0); let recording = transport.take_recording(); assert_eq!( recording.verb_paths(Verb::Read).len(), @@ -643,8 +640,8 @@ mod test { .get_block_content(&hash, monitor.clone()) .await .unwrap(); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 2); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 0); + assert_eq!(collector.get_counter(Counter::BlockContentCacheHit), 2); + assert_eq!(collector.get_counter(Counter::BlockContentCacheMiss), 0); assert_eq!(content, retrieved); assert_eq!(blockdir.stats.cache_hit.load(Relaxed), 2); // hit again let recording = transport.take_recording(); @@ -661,7 +658,7 @@ mod test { let blockdir = BlockDir::open(transport.clone()).await.unwrap(); let mut stats = BackupStats::default(); let content = Bytes::from("stuff"); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let hash = blockdir .store_or_deduplicate(content.clone(), &mut stats, monitor.clone()) .await @@ -669,7 +666,7 @@ mod test { // reopen let _recording = transport.take_recording(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let blockdir = BlockDir::open(transport.clone()).await.unwrap(); assert!(blockdir.contains(&hash)); @@ -694,8 +691,8 @@ mod test { .await .unwrap(); assert_eq!(content, retrieved); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheMiss), 1); - assert_eq!(monitor.get_counter(Counter::BlockContentCacheHit), 0); + assert_eq!(collector.get_counter(Counter::BlockContentCacheMiss), 1); + assert_eq!(collector.get_counter(Counter::BlockContentCacheHit), 0); assert_eq!( blockdir.stats.cache_hit.load(Relaxed), 0, diff --git a/src/counters.rs b/src/counters.rs index de845a7a..5d5cd330 100644 --- a/src/counters.rs +++ b/src/counters.rs @@ -126,6 +126,17 @@ impl Serialize for Counters { } } +impl Clone for Counters { + fn clone(&self) -> Self { + Self { + counters: self + .counters + .each_ref() + .map(|c| AtomicUsize::new(c.load(Relaxed))), + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/diff.rs b/src/diff.rs index a9ed47cc..95acbf5b 100644 --- a/src/diff.rs +++ b/src/diff.rs @@ -15,8 +15,6 @@ //! //! See also [conserve::show_diff] to format the diff as text. -use std::sync::Arc; - use crate::monitor::Monitor; use crate::*; @@ -52,7 +50,7 @@ pub async fn diff( st: &StoredTree, lt: &SourceTree, options: DiffOptions, - monitor: Arc, + monitor: Monitor, ) -> Result { let a = st.iter_entries(Apath::root(), options.exclude.clone(), monitor.clone()); let b = lt.iter_entries(Apath::root(), options.exclude.clone(), monitor.clone())?; @@ -91,7 +89,7 @@ impl Diff { mod tests { use filetime::{FileTime, set_file_mtime}; - use crate::monitor::test::TestMonitor; + use crate::monitor::Monitor; use crate::test_fixtures::TreeFixture; use crate::*; @@ -100,7 +98,7 @@ mod tests { let a = Archive::create_temp().await; let tf = TreeFixture::new(); tf.create_file_with_contents("thing", b"contents of thing"); - let stats = backup(&a, tf.path(), &BackupOptions::default(), TestMonitor::arc()) + let stats = backup(&a, tf.path(), &BackupOptions::default(), Monitor::void()) .await .unwrap(); assert_eq!(stats.new_files, 1); @@ -120,7 +118,7 @@ mod tests { include_unchanged: true, ..DiffOptions::default() }; - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let changes: Vec = diff(&st, &tf.live_tree(), options, monitor.clone()) .await .unwrap() @@ -140,7 +138,7 @@ mod tests { include_unchanged: false, ..DiffOptions::default() }; - let changes = diff(&st, &tf.live_tree(), options, TestMonitor::arc()) + let changes = diff(&st, &tf.live_tree(), options, Monitor::void()) .await .unwrap() .collect() @@ -167,7 +165,7 @@ mod tests { include_unchanged: false, ..DiffOptions::default() }; - let changes: Vec = diff(&st, &tf.live_tree(), options, TestMonitor::arc()) + let changes: Vec = diff(&st, &tf.live_tree(), options, Monitor::void()) .await .unwrap() .collect() @@ -204,7 +202,7 @@ mod tests { include_unchanged: false, ..DiffOptions::default() }; - let changes: Vec = diff(&st, &tf.live_tree(), options, TestMonitor::arc()) + let changes: Vec = diff(&st, &tf.live_tree(), options, Monitor::void()) .await .unwrap() .collect() @@ -225,7 +223,7 @@ mod tests { let a = Archive::create_temp().await; let tf = TreeFixture::new(); tf.create_symlink("link", "target"); - backup(&a, tf.path(), &BackupOptions::default(), TestMonitor::arc()) + backup(&a, tf.path(), &BackupOptions::default(), Monitor::void()) .await .unwrap(); @@ -245,7 +243,7 @@ mod tests { include_unchanged: false, ..DiffOptions::default() }; - let changes: Vec = diff(&st, &tf.live_tree(), options, TestMonitor::arc()) + let changes: Vec = diff(&st, &tf.live_tree(), options, Monitor::void()) .await .unwrap() .collect() diff --git a/src/gc_lock.rs b/src/gc_lock.rs index 77ccb35c..28439f3e 100644 --- a/src/gc_lock.rs +++ b/src/gc_lock.rs @@ -155,7 +155,7 @@ mod test { use std::time::Duration; use super::*; - use crate::monitor::test::TestMonitor; + use crate::monitor::Monitor; use crate::test_fixtures::TreeFixture; #[tokio::test] @@ -181,7 +181,7 @@ mod test { &archive, source.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .unwrap(); @@ -198,7 +198,7 @@ mod test { &archive, source.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await; assert_eq!( @@ -257,7 +257,7 @@ mod test { let lock1 = GarbageCollectionLock::new(&archive).await?; // Backup should fail while gc lock is held. - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let backup_result = backup( &archive, tf.path(), diff --git a/src/index/mod.rs b/src/index/mod.rs index b16c6ca8..f1244fde 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -235,13 +235,13 @@ impl IndexHunkIter { mod tests { use tempfile::TempDir; - use crate::{counters::Counter, monitor::test::TestMonitor}; + use crate::{counters::Counter, monitor::Monitor}; use super::*; fn setup() -> (TempDir, IndexWriter) { let testdir = TempDir::new().unwrap(); - let ib = IndexWriter::new(Transport::local(testdir.path()), TestMonitor::arc()); + let ib = IndexWriter::new(Transport::local(testdir.path()), Monitor::void()); (testdir, ib) } @@ -326,14 +326,14 @@ mod tests { #[tokio::test] async fn basic() -> Result<()> { let transport = Transport::temp(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let mut index_writer = IndexWriter::new(transport.clone(), monitor.clone()); index_writer.append_entries(&mut vec![sample_entry("/apple"), sample_entry("/banana")]); let hunks = index_writer.finish().await.unwrap(); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 1); + assert_eq!(collector.get_counter(Counter::IndexWrites), 1); assert_eq!(hunks, 1); - let counters = monitor.counters(); + let counters = collector.counters(); dbg!(&counters); assert!(counters.get(Counter::IndexWriteCompressedBytes) > 30); assert!(counters.get(Counter::IndexWriteCompressedBytes) < 125,); diff --git a/src/index/stitch.rs b/src/index/stitch.rs index 13e40b3c..cb38523b 100644 --- a/src/index/stitch.rs +++ b/src/index/stitch.rs @@ -29,7 +29,6 @@ //! * Bands might be deleted, so their numbers are not contiguous. use std::iter::Peekable; -use std::sync::Arc; use tracing::trace; @@ -55,7 +54,7 @@ pub struct Stitch { /// Only return entries within this directory. subtree: Apath, - monitor: Arc, + monitor: Monitor, } /// What state is a stitch iter in, and what should happen next? @@ -98,7 +97,7 @@ impl Stitch { band_id: BandId, subtree: Apath, exclude: Exclude, - monitor: Arc, + monitor: Monitor, ) -> Stitch { Stitch { archive: archive.clone(), @@ -111,7 +110,7 @@ impl Stitch { } /// Construct a stitcher that will just return nothing. - pub(crate) fn empty(archive: &Archive, monitor: Arc) -> Stitch { + pub(crate) fn empty(archive: &Archive, monitor: Monitor) -> Stitch { Stitch { archive: archive.clone(), last_apath: None, @@ -224,7 +223,6 @@ mod test { use super::*; use crate::counters::Counter; - use crate::monitor::test::TestMonitor; use crate::test_fixtures::TreeFixture; fn symlink(name: &str, target: &str) -> IndexEntry { @@ -247,7 +245,7 @@ mod test { band_id, Apath::root(), Exclude::nothing(), - TestMonitor::arc(), + Monitor::void(), ); while let Some(entry) = stitch.next().await { strs.push(format!("{}:{}", entry.apath, entry.target.unwrap())); @@ -274,7 +272,7 @@ mod test { // 1 was deleted in b2, 2 is carried over from b2, // and 3 is carried over from b1. - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let band = Band::create(&af).await?; assert_eq!(band.id(), BandId::zero()); let mut ib = band.index_writer(monitor.clone()); @@ -286,12 +284,12 @@ mod test { let hunks = ib.finish().await?; assert_eq!(hunks, 2); assert_eq!( - monitor.get_counter(Counter::IndexWrites), + collector.get_counter(Counter::IndexWrites), 2, "2 hunks were finished" ); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let band = Band::create(&af).await?; assert_eq!(band.id().to_string(), "b0001"); let mut ib = band.index_writer(monitor.clone()); @@ -302,11 +300,11 @@ mod test { ib.push_entry(symlink("/3", "b1")); let hunks = ib.finish().await?; assert_eq!(hunks, 2); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 2); + assert_eq!(collector.get_counter(Counter::IndexWrites), 2); band.close(2).await?; // b2 - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let band = Band::create(&af).await?; assert_eq!(band.id().to_string(), "b0002"); let mut ib = band.index_writer(monitor.clone()); @@ -316,7 +314,7 @@ mod test { // incomplete let hunks = ib.finish().await?; assert_eq!(hunks, 2); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 2); + assert_eq!(collector.get_counter(Counter::IndexWrites), 2); // b3 let band = Band::create(&af).await?; @@ -327,7 +325,7 @@ mod test { assert_eq!(band.id().to_string(), "b0004"); // b5 - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); let band = Band::create(&af).await?; assert_eq!(band.id().to_string(), "b0005"); let mut ib = band.index_writer(monitor.clone()); @@ -335,7 +333,7 @@ mod test { ib.push_entry(symlink("/00", "b5")); let hunks = ib.finish().await?; assert_eq!(hunks, 1); - assert_eq!(monitor.get_counter(Counter::IndexWrites), 1); + assert_eq!(collector.get_counter(Counter::IndexWrites), 1); // incomplete std::fs::remove_dir_all(af.transport().local_path().unwrap().join("b0003"))?; @@ -379,21 +377,17 @@ mod test { tf.create_file("file_a"); let af = Archive::create_temp().await; - backup( - &af, - tf.path(), - &BackupOptions::default(), - TestMonitor::arc(), - ) - .await - .expect("backup should work"); + backup(&af, tf.path(), &BackupOptions::default(), Monitor::void()) + .await + .expect("backup should work"); af.transport().remove_file("b0000/BANDTAIL").await.unwrap(); // band is now incomplete let band_ids = af.list_band_ids().await.expect("should list bands"); let band_id = band_ids.first().expect("expected at least one band"); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let mut entries = Stitch::new( &af, *band_id, @@ -419,7 +413,7 @@ mod test { // It's not an error (at the moment) because a band with no head effectively doesn't exist. // (Maybe later the presence of a band directory with no head file should raise a warning.) - let errors = monitor.take_errors(); + let errors = collector.take_errors(); dbg!(&errors); assert_eq!(errors.len(), 0); } diff --git a/src/index/write.rs b/src/index/write.rs index 37933e97..17141c0d 100644 --- a/src/index/write.rs +++ b/src/index/write.rs @@ -13,8 +13,6 @@ //! The index lists all the files in a backup, sorted in apath order. -use std::sync::Arc; - use tracing::trace; use crate::compress::snappy::Compressor; @@ -49,13 +47,13 @@ pub struct IndexWriter { compressor: Compressor, - monitor: Arc, + monitor: Monitor, } /// Accumulate and write out index entries into files in an index directory. impl IndexWriter { /// Make a new builder that will write files into the given directory. - pub fn new(transport: Transport, monitor: Arc) -> IndexWriter { + pub fn new(transport: Transport, monitor: Monitor) -> IndexWriter { IndexWriter { transport, entries: Vec::new(), diff --git a/src/merge.rs b/src/merge.rs index 257d24dd..96354c44 100644 --- a/src/merge.rs +++ b/src/merge.rs @@ -140,7 +140,7 @@ mod tests { // fn merge_entry_trees() { // let ta = TreeFixture::new(); // let tb = TreeFixture::new(); - // let monitor = TestMonitor::arc(); + // let monitor = Monitor::void(); // let di = MergeTrees::new( // ta.live_tree() // .iter_entries(Apath::root(), Exclude::nothing(), monitor.clone()) diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 00000000..34fcc42d --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,98 @@ +// Copyright 2023-2026 Martin Pool + +//! A monitor observes events from the library and can display them in a UI, log them, record them for testing, etc. + +pub mod task; +pub mod test; +pub mod void; + +use std::fmt::Debug; +use std::sync::Arc; + +use self::task::Task; +use crate::{ + counters::{Counter, Counters}, + monitor::test::Collector, +}; + +/// A monitor receives events from the library and may collect them, report them +/// to the terminal, log them, etc. +/// +/// The monitor can be cloned and passed between threads and all the clones will continue to send events +/// to the same implementation. +#[derive(Debug, Clone)] +pub struct Monitor(Arc); + +impl Monitor { + /// Make a new monitor with the given implementation. + pub fn new(impl_: Arc) -> Self { + Self(impl_) + } + + /// Make a new void monitor that ignores all events. + pub fn void() -> Self { + Self::new(Arc::new(void::VoidMonitorImpl::new())) + } + + /// Make a monitor for tests, and also return the [`Collector`] that records what happened during the tests. + pub fn for_test() -> (Self, Arc) { + let collector = Collector::arc(); + (Self::new(collector.clone()), collector) + } + + /// Notify that a counter increased by a given amount. + pub fn count(&self, counter: Counter, increment: usize) { + self.0.count(counter, increment); + } + + /// Set the absolute value of a counter. + pub fn set_counter(&self, counter: Counter, value: usize) { + self.0.set_counter(counter, value); + } + + pub fn get_counters(&self) -> Counters { + self.0.get_counters() + } + + /// A non-fatal error occurred. + pub fn error(&self, error: crate::Error) { + self.0.error(error); + } + + /// Start a [`Task`] with the given name, and return a handle to it. + pub fn start_task(&self, name: String) -> Task { + self.0.start_task(name) + } + + /// Emit some text output to stdout, followed by a newline. + pub fn println(&self, text: &str) { + self.0.println(text); + } + + pub fn error_count(&self) -> usize { + self.0.error_count() + } +} + +/// The specific implementation of a monitor, which receives events from the library. +/// +/// The impl need not (and typically will not) be `Clone` because it's held by a [`Monitor`] +/// and is shared across threads. +pub trait MonitorImpl: Send + Sync + Debug + 'static { + /// Notify that a counter increased by a given amount. + fn count(&self, counter: Counter, increment: usize); + + /// Set the absolute value of a counter. + fn set_counter(&self, counter: Counter, value: usize); + + fn get_counters(&self) -> Counters; + + /// A non-fatal error occurred. + fn error(&self, error: crate::Error); + + fn start_task(&self, name: String) -> Task; + + fn println(&self, text: &str); + + fn error_count(&self) -> usize; +} diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs deleted file mode 100644 index 622c1568..00000000 --- a/src/monitor/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2023-2024 Martin Pool - -//! Communication from the library to a monitor: a test, a UI, etc. - -pub mod task; -pub mod test; -pub mod void; - -use self::task::Task; -use crate::counters::Counter; - -/// A monitor receives events from the library and may collect them, report them -/// to the terminal, log them, etc. -pub trait Monitor: Send + Sync + 'static { - /// Notify that a counter increased by a given amount. - fn count(&self, counter: Counter, increment: usize); - - /// Set the absolute value of a counter. - fn set_counter(&self, counter: Counter, value: usize); - - /// A non-fatal error occurred. - fn error(&self, error: crate::Error); - - fn start_task(&self, name: String) -> Task; -} diff --git a/src/monitor/task.rs b/src/monitor/task.rs index d47f7433..0e8d7e40 100644 --- a/src/monitor/task.rs +++ b/src/monitor/task.rs @@ -8,7 +8,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock, Weak}; -#[derive(Default)] +#[derive(Default, Debug)] pub struct TaskList { tasks: Vec>, } diff --git a/src/monitor/test.rs b/src/monitor/test.rs index 1f6710f7..4247d12c 100644 --- a/src/monitor/test.rs +++ b/src/monitor/test.rs @@ -5,12 +5,12 @@ use std::mem::take; use std::sync::{Arc, Mutex}; -use super::Monitor; +use super::MonitorImpl; use super::task::{Task, TaskList}; use crate::counters::{Counter, Counters}; use crate::{Apath, Error}; -/// A monitor that collects information for later inspection, +/// A monitor implementation that collects information for later inspection, /// particularly from tests. /// /// Errors are collected in a vector. @@ -18,22 +18,22 @@ use crate::{Apath, Error}; /// Tasks are ignored. /// /// Totals of counters are kept. -#[derive(Default)] -pub struct TestMonitor { +#[derive(Default, Debug)] +pub struct Collector { errors: Mutex>, counters: Counters, started_files: Mutex>, task_list: Mutex, } -impl TestMonitor { +impl Collector { pub fn new() -> Self { - TestMonitor::default() + Collector::default() } /// Construct a new TestMonitor and wrap it in an Arc. - pub fn arc() -> Arc { - Arc::new(TestMonitor::new()) + pub fn arc() -> Arc { + Arc::new(Collector::new()) } pub fn get_counter(&self, counter: Counter) -> usize { @@ -71,7 +71,7 @@ impl TestMonitor { } } -impl Monitor for TestMonitor { +impl MonitorImpl for Collector { fn count(&self, counter: Counter, increment: usize) { self.counters.count(counter, increment) } @@ -80,6 +80,10 @@ impl Monitor for TestMonitor { self.counters.set(counter, value) } + fn get_counters(&self) -> Counters { + self.counters.clone() + } + fn error(&self, error: Error) { self.errors.lock().unwrap().push(error); } @@ -87,4 +91,12 @@ impl Monitor for TestMonitor { fn start_task(&self, name: String) -> Task { self.task_list.lock().unwrap().start_task(name) } + + fn println(&self, _text: &str) { + // Ignore output. (We could later collect it for tests to inspect.) + } + + fn error_count(&self) -> usize { + self.errors.lock().unwrap().len() + } } diff --git a/src/monitor/void.rs b/src/monitor/void.rs index 8d503bc7..c361d600 100644 --- a/src/monitor/void.rs +++ b/src/monitor/void.rs @@ -1,19 +1,50 @@ -use crate::counters::Counter; +use std::sync::atomic::AtomicUsize; + +use crate::counters::{Counter, Counters}; use super::{ - Monitor, + MonitorImpl, task::{Task, TaskList}, }; -/// A monitor that does not capture any information. -#[derive(Debug, Clone)] -pub struct VoidMonitor; -impl Monitor for VoidMonitor { - fn count(&self, _counter: Counter, _increment: usize) {} +/// A monitor that does not capture any information aside from the error count and counters. +/// +/// This is convenient for use in tests that don't care about checking for side effects: +/// but most tests actually should use [`conserve::monitor::Monitor::for_tests`] and make +/// assertions about the result. +#[derive(Debug, Default)] +pub struct VoidMonitorImpl { + error_count: AtomicUsize, + counters: Counters, +} + +impl VoidMonitorImpl { + pub fn new() -> Self { + Self::default() + } +} + +impl MonitorImpl for VoidMonitorImpl { + fn count(&self, counter: Counter, increment: usize) { + self.counters.count(counter, increment); + } + + fn set_counter(&self, counter: Counter, value: usize) { + self.counters.set(counter, value); + } - fn set_counter(&self, _counter: Counter, _value: usize) {} + fn get_counters(&self) -> Counters { + self.counters.clone() + } - fn error(&self, _error: crate::Error) {} + fn error(&self, _error: crate::Error) { + self.error_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + fn error_count(&self) -> usize { + self.error_count.load(std::sync::atomic::Ordering::Relaxed) + } fn start_task(&self, name: String) -> Task { /* @@ -23,4 +54,6 @@ impl Monitor for VoidMonitor { let mut list = TaskList::default(); list.start_task(name) } + + fn println(&self, _text: &str) {} } diff --git a/src/restore.rs b/src/restore.rs index 4ac74219..77b19005 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::fs::{File, create_dir_all}; use std::io::{self, Write}; use std::path::{Path, PathBuf}; -use std::sync::Arc; use filetime::set_file_handle_times; #[cfg(unix)] @@ -71,7 +70,7 @@ pub async fn restore( archive: &Archive, destination: &Path, options: RestoreOptions, - monitor: Arc, + monitor: Monitor, ) -> Result<()> { let st = archive .open_stored_tree(options.band_selection.clone()) @@ -180,7 +179,7 @@ struct DirDeferral { owner: Owner, } -fn apply_deferrals(deferrals: &[DirDeferral], monitor: Arc) -> Result<()> { +fn apply_deferrals(deferrals: &[DirDeferral], monitor: Monitor) -> Result<()> { for DirDeferral { path, unix_mode, @@ -216,7 +215,7 @@ async fn restore_file( path: PathBuf, source_entry: &IndexEntry, block_dir: &BlockDir, - monitor: Arc, + monitor: Monitor, ) -> Result<()> { let mut out = File::create(&path).map_err(|err| Error::RestoreFile { path: path.clone(), @@ -325,7 +324,7 @@ mod test { use tempfile::TempDir; use crate::counters::Counter; - use crate::monitor::test::TestMonitor; + use crate::monitor::Monitor; use crate::test_fixtures::{TreeFixture, store_two_versions}; use crate::transport::Transport; use crate::*; @@ -348,13 +347,13 @@ mod test { })), ..Default::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore(&restore_archive, destdir.path(), options, monitor.clone()) .await .expect("restore"); - monitor.assert_no_errors(); - monitor.assert_counter(Counter::Files, 3); + collector.assert_no_errors(); + collector.assert_counter(Counter::Files, 3); let mut expected_names = vec![ "/", "/hello", @@ -392,13 +391,13 @@ mod test { band_selection: BandSelectionPolicy::Specified(band_id), ..RestoreOptions::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore(&archive, destdir.path(), options, monitor.clone()) .await .expect("restore"); - monitor.assert_no_errors(); + collector.assert_no_errors(); // Does not have the 'hello2' file added in the second version. - monitor.assert_counter(Counter::Files, 2); + collector.assert_counter(Counter::Files, 2); } /// Restoring a subdirectory works, and restores the parent directories: @@ -408,25 +407,21 @@ mod test { async fn restore_only_subdir() { // We need the selected directory to be more than one level down, because the bug was that // its parent was not created. - let backup_monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let src = TempDir::new().unwrap(); create_dir(src.path().join("parent")).unwrap(); create_dir(src.path().join("parent/sub")).unwrap(); write(src.path().join("parent/sub/file"), b"hello").unwrap(); let af = Archive::create_temp().await; - backup( - &af, - src.path(), - &BackupOptions::default(), - backup_monitor.clone(), - ) - .await - .unwrap(); - backup_monitor.assert_counter(Counter::Files, 1); - backup_monitor.assert_no_errors(); + backup(&af, src.path(), &BackupOptions::default(), monitor.clone()) + .await + .unwrap(); + collector.assert_no_errors(); + collector.assert_counter(Counter::Files, 1); let destdir = TreeFixture::new(); - let restore_monitor = TestMonitor::arc(); + let (restore_monitor, restore_collector) = Monitor::for_test(); let archive = Archive::open(af.transport().clone()).await.unwrap(); let options = RestoreOptions { only_subtree: Some(Apath::from("/parent/sub")), @@ -435,11 +430,11 @@ mod test { restore(&archive, destdir.path(), options, restore_monitor.clone()) .await .expect("restore"); - restore_monitor.assert_no_errors(); + restore_collector.assert_no_errors(); assert!(destdir.path().join("parent").is_dir()); assert!(destdir.path().join("parent/sub/file").is_file()); - dbg!(restore_monitor.counters()); - restore_monitor.assert_counter(Counter::Files, 1); + dbg!(restore_collector.counters()); + restore_collector.assert_counter(Counter::Files, 1); } #[tokio::test] @@ -452,7 +447,7 @@ mod test { ..RestoreOptions::default() }; assert!(!options.overwrite, "overwrite is false by default"); - let restore_err_str = restore(&af, destdir.path(), options, TestMonitor::arc()) + let restore_err_str = restore(&af, destdir.path(), options, Monitor::void()) .await .expect_err("restore should fail if the destination exists") .to_string(); @@ -474,12 +469,12 @@ mod test { overwrite: true, ..RestoreOptions::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore(&restore_archive, destdir.path(), options, monitor.clone()) .await .expect("restore"); - monitor.assert_no_errors(); - monitor.assert_counter(Counter::Files, 3); + collector.assert_no_errors(); + collector.assert_counter(Counter::Files, 3); let dest = destdir.path(); assert!(dest.join("hello").is_file()); assert!(dest.join("existing").is_file()); @@ -496,7 +491,7 @@ mod test { exclude: Exclude::from_strings(["/**/subfile"]).unwrap(), ..RestoreOptions::default() }; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore(&restore_archive, destdir.path(), options, monitor.clone()) .await .expect("restore"); @@ -505,8 +500,8 @@ mod test { assert!(dest.join("hello").is_file()); assert!(dest.join("hello2").is_file()); assert!(dest.join("subdir").is_dir()); - monitor.assert_no_errors(); - monitor.assert_counter(Counter::Files, 2); + collector.assert_no_errors(); + collector.assert_counter(Counter::Files, 2); } #[tokio::test] @@ -524,14 +519,12 @@ mod test { let years_ago = FileTime::from_unix_time(189216000, 0); set_symlink_file_times(srcdir.path().join("symlink"), years_ago, years_ago).unwrap(); - let monitor = TestMonitor::arc(); - backup(&af, srcdir.path(), &Default::default(), monitor.clone()) + backup(&af, srcdir.path(), &Default::default(), Monitor::void()) .await .unwrap(); let restore_dir = TempDir::new().unwrap(); - let monitor = TestMonitor::arc(); - restore(&af, restore_dir.path(), Default::default(), monitor.clone()) + restore(&af, restore_dir.path(), Default::default(), Monitor::void()) .await .unwrap(); @@ -558,7 +551,7 @@ mod test { .inject_failures .insert(Apath::from("/subdir"), io::ErrorKind::PermissionDenied); let restore_tmp = TempDir::new().unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore( &archive, restore_tmp.path(), @@ -567,7 +560,7 @@ mod test { ) .await .expect("Restore"); - let errors = monitor.take_errors(); + let errors = collector.take_errors(); dbg!(&errors); assert_eq!(errors.len(), 2); if let Error::RestoreDirectory { path, .. } = &errors[0] { diff --git a/src/show.rs b/src/show.rs index 3023b1e6..52d6c599 100644 --- a/src/show.rs +++ b/src/show.rs @@ -18,13 +18,12 @@ use std::borrow::Cow; use std::io::{BufWriter, Write}; -use std::sync::Arc; use jiff::tz::TimeZone; use tracing::error; use crate::index::entry::IndexEntry; -use crate::termui::TermUiMonitor; +use crate::monitor::Monitor; use crate::*; /// Options controlling the behavior of `show_versions`. @@ -47,7 +46,7 @@ pub struct ShowVersionsOptions { pub async fn show_versions( archive: &Archive, options: &ShowVersionsOptions, - monitor: Arc, + monitor: Monitor, ) -> Result<()> { let mut band_ids = archive.list_band_ids().await?; let timezone = if options.utc { @@ -114,8 +113,7 @@ pub async fn show_versions( crate::misc::bytes_to_human_mb(sizes.file_bytes) )); } - monitor.clear_progress_bars(); // to avoid fighting with stdout - println!("{}", l.join(" ")); + monitor.println(&l.join(" ")); } Ok(()) } diff --git a/src/source.rs b/src/source.rs index ed014edd..84fc28d5 100644 --- a/src/source.rs +++ b/src/source.rs @@ -21,7 +21,6 @@ use std::fs; use std::fs::File; use std::io::ErrorKind; use std::path::{Path, PathBuf}; -use std::sync::Arc; use tracing::{error, warn}; @@ -62,7 +61,7 @@ impl SourceTree { File::open(&path).map_err(|source| Error::ReadSourceFile { path, source }) } - pub fn size(&self, exclude: Exclude, monitor: Arc) -> Result { + pub fn size(&self, exclude: Exclude, monitor: Monitor) -> Result { let mut file_bytes = 0u64; let task = monitor.start_task("Measure tree".to_string()); for e in self.iter_entries(Apath::from("/"), exclude, monitor.clone())? { @@ -81,7 +80,7 @@ impl SourceTree { &self, subtree: Apath, exclude: Exclude, - _monitor: Arc, + _monitor: Monitor, ) -> Result { Iter::new(&self.path, subtree, exclude) } @@ -335,7 +334,6 @@ impl Iterator for Iter { mod test { use itertools::Itertools; - use crate::monitor::test::TestMonitor; use crate::test_fixtures::TreeFixture; use super::*; @@ -358,7 +356,7 @@ mod test { tf.create_dir("jam/.etc"); let lt = SourceTree::open(tf.path()).unwrap(); let result: Vec = lt - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .unwrap() .collect(); let names: Vec = result.iter().map(|e| e.apath().to_string()).collect(); @@ -401,7 +399,7 @@ mod test { let lt = SourceTree::open(tf.path()).unwrap(); let names = lt - .iter_entries(Apath::root(), exclude, TestMonitor::arc()) + .iter_entries(Apath::root(), exclude, Monitor::void()) .unwrap() .map(|e| e.apath().to_string()) .collect_vec(); @@ -423,7 +421,7 @@ mod test { let lt = SourceTree::open(tf.path()).unwrap(); let names = lt - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .unwrap() .map(|e| e.apath().to_string()) .collect_vec(); @@ -443,7 +441,7 @@ mod test { let lt = SourceTree::open(tf.path()).unwrap(); let names = lt - .iter_entries("/subdir".into(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries("/subdir".into(), Exclude::nothing(), Monitor::void()) .unwrap() .map(|e| e.apath().to_string()) .collect_vec(); @@ -460,7 +458,7 @@ mod test { let lt = SourceTree::open(tf.path()).unwrap(); let names = lt - .iter_entries(Apath::root(), Exclude::nothing(), TestMonitor::arc()) + .iter_entries(Apath::root(), Exclude::nothing(), Monitor::void()) .unwrap() .map(|e| e.apath().to_string()) .collect_vec(); diff --git a/src/stored_tree.rs b/src/stored_tree.rs index b952cfab..bb1d2050 100644 --- a/src/stored_tree.rs +++ b/src/stored_tree.rs @@ -18,8 +18,6 @@ //! across incremental backups, hiding from the caller that data may be distributed across //! multiple index files, bands, and blocks. -use std::sync::Arc; - use crate::counters::Counter; use crate::index::stitch::Stitch; use crate::monitor::Monitor; @@ -49,7 +47,7 @@ impl StoredTree { self.band.is_closed().await } - pub async fn size(&self, exclude: Exclude, monitor: Arc) -> Result { + pub async fn size(&self, exclude: Exclude, monitor: Monitor) -> Result { let mut file_bytes = 0u64; let task = monitor.start_task("Measure tree".to_string()); let mut stitch = self.iter_entries(Apath::from("/"), exclude, monitor.clone()); @@ -68,12 +66,7 @@ impl StoredTree { /// Return an iter of index entries in this stored tree. // TODO: Should perhaps return a sequence of results so that the caller has the // option to handle errors or continue. - pub fn iter_entries( - &self, - subtree: Apath, - exclude: Exclude, - monitor: Arc, - ) -> Stitch { + pub fn iter_entries(&self, subtree: Apath, exclude: Exclude, monitor: Monitor) -> Stitch { // TODO: Pass in this band so that we don't need to reopen it. Stitch::new(&self.archive, self.band.id(), subtree, exclude, monitor) } @@ -83,7 +76,7 @@ impl StoredTree { mod test { use std::path::Path; - use crate::monitor::test::TestMonitor; + use crate::monitor::Monitor; use super::super::test_fixtures::*; use super::super::*; @@ -100,7 +93,7 @@ mod test { assert_eq!(st.band().id(), last_band_id); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let names: Vec = st .iter_entries(Apath::root(), Exclude::nothing(), monitor.clone()) .collect_all() @@ -146,7 +139,7 @@ mod test { .await .unwrap(); - let monitor = TestMonitor::arc(); + let monitor = Monitor::void(); let names: Vec = st .iter_entries("/subdir".into(), Exclude::nothing(), monitor.clone()) .collect_all() diff --git a/src/termui/monitor.rs b/src/termui/monitor.rs index 430fdbf4..147c6069 100644 --- a/src/termui/monitor.rs +++ b/src/termui/monitor.rs @@ -2,8 +2,8 @@ //! Monitor on a terminal UI. -use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed}; use std::sync::{Arc, Mutex}; use std::thread::{JoinHandle, sleep, spawn}; use std::time::Duration; @@ -14,7 +14,7 @@ use tracing::error; use crate::Error; use crate::counters::{Counter, Counters}; -use crate::monitor::Monitor; +use crate::monitor::MonitorImpl; use crate::monitor::task::{Task, TaskList}; pub struct TermUiMonitor { @@ -22,6 +22,7 @@ pub struct TermUiMonitor { counters: Arc, // active_files: Mutex>, tasks: Arc>, + /// Nutmeg view that paints stdout periodically from the Model. view: Arc>, /// A thread that periodically updates the view's progress bars from the Model. /// @@ -34,7 +35,16 @@ pub struct TermUiMonitor { error_count: AtomicUsize, } -/// The nutmeg model. +impl Debug for TermUiMonitor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TermUiMonitor") + .field("counters", &self.counters) + .field("tasks", &self.tasks) + .finish_non_exhaustive() + } +} + +/// The state that's painted into progress bars. pub(super) struct Model { counters: Arc, tasks: Arc>, @@ -110,7 +120,7 @@ impl Drop for TermUiMonitor { } } -impl Monitor for TermUiMonitor { +impl MonitorImpl for TermUiMonitor { fn count(&self, counter: Counter, increment: usize) { self.counters.count(counter, increment) } @@ -119,14 +129,27 @@ impl Monitor for TermUiMonitor { self.counters.set(counter, value) } + fn get_counters(&self) -> Counters { + self.counters.as_ref().clone() + } + fn error(&self, error: Error) { error!(target: "conserve", "{error}"); self.error_count.fetch_add(1, Relaxed); } + fn error_count(&self) -> usize { + self.error_count.load(Relaxed) + } + fn start_task(&self, name: String) -> Task { self.tasks.lock().unwrap().start_task(name) } + + fn println(&self, text: &str) { + self.view.clear(); + println!("{text}"); + } } impl nutmeg::Model for Model { diff --git a/src/termui/trace.rs b/src/termui/trace.rs index 07c33937..e5440ad4 100644 --- a/src/termui/trace.rs +++ b/src/termui/trace.rs @@ -16,6 +16,8 @@ use tracing_subscriber::{ prelude::*, }; +use crate::termui::TermUiMonitor; + /// Chosen style of timestamp prefix on trace lines. #[derive(clap::ValueEnum, Clone, Debug)] pub enum TraceTimeStyle { @@ -31,7 +33,7 @@ pub enum TraceTimeStyle { #[must_use] pub fn enable_tracing( - monitor: &super::TermUiMonitor, + term_ui_monitor: &TermUiMonitor, time_style: &TraceTimeStyle, console_level: Level, json_path: &Option, @@ -40,7 +42,7 @@ pub fn enable_tracing( let time_style = time_style.clone(); let console_layer = tracing_subscriber::fmt::Layer::default() .with_ansi(clicolors_control::colors_enabled()) - .with_writer(monitor.view()) + .with_writer(term_ui_monitor.view()) .with_timer(time_style) .with_filter(filter::Targets::new().with_target("conserve", console_level)); let json_layer; diff --git a/src/test_fixtures.rs b/src/test_fixtures.rs index 2f67047b..6a7def02 100644 --- a/src/test_fixtures.rs +++ b/src/test_fixtures.rs @@ -21,7 +21,7 @@ use std::path::{Path, PathBuf}; use tempfile::TempDir; -use crate::monitor::test::TestMonitor; +use crate::monitor::Monitor; use crate::*; pub async fn setup_incomplete_empty_band(archive: &Archive) { @@ -38,12 +38,12 @@ pub async fn store_two_versions(archive: &Archive) { } let options = &BackupOptions::default(); - backup(archive, srcdir.path(), options, TestMonitor::arc()) + backup(archive, srcdir.path(), options, Monitor::void()) .await .unwrap(); srcdir.create_file("hello2"); - backup(archive, srcdir.path(), options, TestMonitor::arc()) + backup(archive, srcdir.path(), options, Monitor::void()) .await .unwrap(); } diff --git a/src/validate.rs b/src/validate.rs index 40d8aeed..15f7e3a1 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -13,7 +13,6 @@ use std::cmp::max; use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use tracing::debug; @@ -34,7 +33,7 @@ pub struct ValidateOptions { pub(crate) async fn validate_bands( archive: &Archive, band_ids: &[BandId], - monitor: Arc, + monitor: Monitor, ) -> Result> { let mut block_lens = HashMap::new(); let task = monitor.start_task("Validate indexes".to_string()); @@ -84,7 +83,7 @@ fn merge_block_lens(into: &mut HashMap, from: &HashMap, + monitor: Monitor, ) -> Result> { // TODO: Check other entry properties are correct. // TODO: Check they're in apath order. diff --git a/tests/damage.rs b/tests/damage.rs index 970b9f4e..d06c3fa0 100644 --- a/tests/damage.rs +++ b/tests/damage.rs @@ -32,10 +32,10 @@ use std::fs::rename; use std::fs::{OpenOptions, remove_file}; use std::path::{Path, PathBuf}; -use std::sync::Arc; use assert_fs::TempDir; use assert_fs::prelude::*; +use conserve::monitor::Monitor; use dir_assert::assert_paths; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -43,7 +43,6 @@ use rstest::rstest; use tracing::info; use conserve::counters::Counter; -use conserve::monitor::test::TestMonitor; use conserve::transport::Transport; use conserve::{ Apath, Archive, BackupOptions, BandId, BandSelectionPolicy, EntryTrait, Error, Exclude, @@ -104,7 +103,7 @@ async fn backup_after_damage( &archive, source_dir.path(), &backup_options, - TestMonitor::arc(), + Monitor::void(), ) .await .expect("initial backup"); @@ -123,7 +122,7 @@ async fn backup_after_damage( &archive, source_dir.path(), &backup_options, - TestMonitor::arc(), + Monitor::void(), ) .await .expect("write second backup after damage"); @@ -162,7 +161,8 @@ async fn backup_after_damage( // Can restore the second backup { let restore_dir = TempDir::new().unwrap(); - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + restore( &archive, restore_dir.path(), @@ -171,8 +171,8 @@ async fn backup_after_damage( ) .await .expect("restore second backup"); - monitor.assert_counter(Counter::Files, 1); - monitor.assert_no_errors(); + collector.assert_counter(Counter::Files, 1); + collector.assert_no_errors(); // Since the second backup rewrote the single file in the backup (and the root dir), // we should get all the content back out. @@ -189,7 +189,7 @@ async fn backup_after_damage( BandSelectionPolicy::Latest, Apath::root(), Exclude::nothing(), - TestMonitor::arc(), + Monitor::void(), ) .await .expect("iter entries") @@ -209,7 +209,7 @@ async fn backup_after_damage( // Validation completes although with warnings. // TODO: This should return problems that we can inspect. archive - .validate(&ValidateOptions::default(), Arc::new(TestMonitor::new())) + .validate(&ValidateOptions::default(), Monitor::void()) .await .expect("validate"); } @@ -299,12 +299,12 @@ impl DamageLocation { #[tokio::test] async fn missing_block_when_checking_hashes() -> Result<()> { let archive = Archive::open_path(Path::new("testdata/damaged/missing-block")).await?; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); archive .validate(&ValidateOptions::default(), monitor.clone()) .await .unwrap(); - let errors = monitor.take_errors(); + let errors = collector.take_errors(); dbg!(&errors); assert_eq!(errors.len(), 1); assert!(matches!(errors[0], Error::BlockMissing { .. })); @@ -314,7 +314,7 @@ async fn missing_block_when_checking_hashes() -> Result<()> { #[tokio::test] async fn missing_block_skip_block_hashes() -> Result<()> { let archive = Archive::open_path(Path::new("testdata/damaged/missing-block")).await?; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); archive .validate( &ValidateOptions { @@ -323,7 +323,7 @@ async fn missing_block_skip_block_hashes() -> Result<()> { monitor.clone(), ) .await?; - let errors = monitor.take_errors(); + let errors = collector.take_errors(); dbg!(&errors); assert_eq!(errors.len(), 1); assert!(matches!(errors[0], Error::BlockMissing { .. })); diff --git a/tests/mount.rs b/tests/mount.rs index ac1b0aac..d1a32b82 100644 --- a/tests/mount.rs +++ b/tests/mount.rs @@ -7,7 +7,7 @@ use std::{ }; use conserve::{ - BackupOptions, MountOptions, backup, monitor::test::TestMonitor, test_fixtures::TreeFixture, + BackupOptions, MountOptions, backup, monitor::test::Collector, test_fixtures::TreeFixture, }; use tempfile::TempDir; @@ -76,7 +76,7 @@ async fn mount_sub_dirs() { &archive, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .unwrap(); } @@ -131,7 +131,7 @@ async fn mount_file_versions() { &archive, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .unwrap(); @@ -141,7 +141,7 @@ async fn mount_file_versions() { &archive, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .unwrap(); } @@ -225,7 +225,7 @@ async fn mount_cleanup() { &archive, srcdir.path(), &BackupOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .unwrap(); } diff --git a/tests/old_archives.rs b/tests/old_archives.rs index f5145200..f83b48f3 100644 --- a/tests/old_archives.rs +++ b/tests/old_archives.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, Mutex}; use assert_fs::TempDir; use assert_fs::prelude::*; use conserve::counters::Counter; -use conserve::monitor::test::TestMonitor; +use conserve::monitor::Monitor; use conserve::*; use jiff::Timestamp; use predicates::prelude::*; @@ -84,7 +84,7 @@ async fn validate_archive() { let archive = open_old_archive(ver, "minimal").await; archive - .validate(&ValidateOptions::default(), Arc::new(TestMonitor::new())) + .validate(&ValidateOptions::default(), Monitor::void()) .await .expect("validate archive"); // TODO: Assert no problems found @@ -103,7 +103,8 @@ async fn long_listing_old_archive() { let mut output = String::new(); // show archive contents - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); + let mut entries = archive .open_stored_tree(BandSelectionPolicy::Latest) .await @@ -112,7 +113,7 @@ async fn long_listing_old_archive() { while let Some(entry) = entries.next().await { output.push_str(&format!("{}\n", entry.format_ls(true))); } - monitor.assert_no_errors(); + collector.assert_no_errors(); if first_with_perms.matches(&semver::Version::parse(ver).unwrap()) { assert_eq!( @@ -143,7 +144,7 @@ async fn restore_old_archive() { println!("restore {} to {:?}", ver, dest.path()); let archive = open_old_archive(ver, "minimal").await; - let monitor = TestMonitor::arc(); + let (monitor, collector) = Monitor::for_test(); restore( &archive, dest.path(), @@ -153,10 +154,10 @@ async fn restore_old_archive() { .await .expect("restore"); - monitor.assert_counter(Counter::Symlinks, 0); - monitor.assert_counter(Counter::Files, 2); - monitor.assert_counter(Counter::Dirs, 2); - monitor.assert_no_errors(); + collector.assert_counter(Counter::Symlinks, 0); + collector.assert_counter(Counter::Files, 2); + collector.assert_counter(Counter::Dirs, 2); + collector.assert_no_errors(); dest.child("hello").assert("hello world\n"); dest.child("subdir").assert(predicate::path::is_dir()); @@ -204,7 +205,7 @@ async fn restore_modify_backup() { &archive, working_tree.path(), RestoreOptions::default(), - TestMonitor::arc(), + Monitor::void(), ) .await .expect("restore"); @@ -241,7 +242,7 @@ async fn restore_modify_backup() { })), ..Default::default() }, - TestMonitor::arc(), + Monitor::void(), ) .await .expect("Backup modified tree"); diff --git a/tests/proptest_changes.rs b/tests/proptest_changes.rs index 97371023..2f4921a2 100644 --- a/tests/proptest_changes.rs +++ b/tests/proptest_changes.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; use std::fs; use std::path::Path; -use conserve::monitor::test::TestMonitor; +use conserve::monitor::Monitor; use proptest::prelude::*; use proptest_derive::Arbitrary; use tempfile::TempDir; @@ -94,7 +94,7 @@ async fn backup_sequential_changes(changes: &[TreeChange]) { max_entries_per_hunk: 3, ..BackupOptions::default() }; - backup(&archive, tf.path(), &options, TestMonitor::arc()) + backup(&archive, tf.path(), &options, Monitor::void()) .await .unwrap(); let snapshot = TempDir::new().unwrap(); @@ -129,7 +129,7 @@ async fn check_restore_against_snapshot(archive: &Archive, band_id: BandId, snap band_selection: BandSelectionPolicy::Specified(band_id), ..RestoreOptions::default() }; - restore(archive, restore_dir.path(), options, TestMonitor::arc()) + restore(archive, restore_dir.path(), options, Monitor::void()) .await .unwrap(); dir_assert::assert_paths(restore_dir.path(), snapshot).unwrap();