From 29c16e27aa2acf4f92b739fa6f495ce0576f9895 Mon Sep 17 00:00:00 2001 From: Ken-ichi Ueda Date: Fri, 27 Mar 2026 12:29:50 -0700 Subject: [PATCH] refactor: add media to zip during download process And not in the finalizing step. Hopefully this means "finalizing" is close to instantaneous. Co-authored-by: Claude (claude-sonnet-4-6) --- chuck-core/src/archive_updater.rs | 12 +- chuck-core/src/chuck_metadata.rs | 6 +- chuck-core/src/darwin_core/archive.rs | 215 +++++++++++++------------- chuck-core/src/downloader.rs | 20 +-- 4 files changed, 129 insertions(+), 124 deletions(-) diff --git a/chuck-core/src/archive_updater.rs b/chuck-core/src/archive_updater.rs index d124d26..63ec104 100644 --- a/chuck-core/src/archive_updater.rs +++ b/chuck-core/src/archive_updater.rs @@ -821,15 +821,15 @@ mod tests { use crate::darwin_core::meta::Metadata; use crate::darwin_core::archive::ArchiveBuilder; + let tmp = tempfile::NamedTempFile::new().unwrap(); let metadata = Metadata::default(); let builder = ArchiveBuilder::new( vec![DwcaExtension::SimpleMultimedia, DwcaExtension::Identifications], metadata, - &std::env::temp_dir(), + tmp.path(), ).unwrap(); - let tmp = tempfile::NamedTempFile::new().unwrap(); let path = tmp.path().to_str().unwrap().to_string(); - builder.build(&path).await.unwrap(); + builder.build().await.unwrap(); let exts = infer_extensions(&path).unwrap(); assert!(exts.contains(&DwcaExtension::SimpleMultimedia)); @@ -843,11 +843,11 @@ mod tests { use crate::darwin_core::meta::Metadata; use crate::darwin_core::archive::ArchiveBuilder; - let metadata = Metadata::default(); - let builder = ArchiveBuilder::new(vec![], metadata, &std::env::temp_dir()).unwrap(); let tmp = tempfile::NamedTempFile::new().unwrap(); + let metadata = Metadata::default(); + let builder = ArchiveBuilder::new(vec![], metadata, tmp.path()).unwrap(); let path = tmp.path().to_str().unwrap().to_string(); - builder.build(&path).await.unwrap(); + builder.build().await.unwrap(); assert!(!archive_has_media(&path).unwrap()); } diff --git a/chuck-core/src/chuck_metadata.rs b/chuck-core/src/chuck_metadata.rs index 20f94ea..71c9a02 100644 --- a/chuck-core/src/chuck_metadata.rs +++ b/chuck-core/src/chuck_metadata.rs @@ -55,10 +55,10 @@ mod tests { use crate::darwin_core::archive::ArchiveBuilder; async fn build_archive(inat_query: Option) -> tempfile::NamedTempFile { - let metadata = Metadata { inat_query, ..Default::default() }; - let builder = ArchiveBuilder::new(vec![], metadata, &std::env::temp_dir()).unwrap(); let tmp = tempfile::NamedTempFile::new().unwrap(); - builder.build(tmp.path().to_str().unwrap()).await.unwrap(); + let metadata = Metadata { inat_query, ..Default::default() }; + let builder = ArchiveBuilder::new(vec![], metadata, tmp.path()).unwrap(); + builder.build().await.unwrap(); tmp } diff --git a/chuck-core/src/darwin_core/archive.rs b/chuck-core/src/darwin_core/archive.rs index cd67705..19497cc 100644 --- a/chuck-core/src/darwin_core/archive.rs +++ b/chuck-core/src/darwin_core/archive.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::io::Write; -use std::path::{PathBuf}; +use std::path::{Path, PathBuf}; use tempfile::TempDir; use zip::write::{FileOptions, ZipWriter}; use zip::CompressionMethod; @@ -16,6 +16,9 @@ use crate::darwin_core::{ /// A DarwinCore Archive builder that can stream occurrence records and generate a compliant ZIP archive pub struct ArchiveBuilder { temp_dir: TempDir, + zip: ZipWriter, + /// The final destination path; the ZIP is written to a temp file and renamed here on success. + output_path: PathBuf, occurrence_writer: csv::Writer, multimedia_writer: Option>, audiovisual_writer: Option>, @@ -32,30 +35,37 @@ pub struct ArchiveBuilder { audiovisual_file_path: PathBuf, identification_file_path: PathBuf, comment_file_path: PathBuf, - media_dir_path: PathBuf, metadata: Metadata, } impl ArchiveBuilder { /// Create a new DarwinCore Archive builder. - /// `base_dir` is the directory in which the temporary working directory is created; - /// pass the parent of the output ZIP so temp files land on the same filesystem. + /// Opens the output ZIP file immediately; pass the final output path so that + /// temp files land on the same filesystem (important on Linux where /tmp may be tmpfs). pub fn new( dwc_extensions: Vec, metadata: Metadata, - base_dir: &std::path::Path, + output_path: &Path, ) -> Result> { + let base_dir = output_path.parent().unwrap_or(Path::new(".")); let temp_dir = TempDir::new_in(base_dir)?; let occurrence_file_path = temp_dir.path().join("occurrence.csv"); let multimedia_file_path = temp_dir.path().join("multimedia.csv"); let audiovisual_file_path = temp_dir.path().join("audiovisual.csv"); let identification_file_path = temp_dir.path().join("identification.csv"); let comment_file_path = temp_dir.path().join("comment.csv"); - let media_dir_path = temp_dir.path().join("media"); - // Create media directory + // Create media staging directory inside temp dir + let media_dir_path = temp_dir.path().join("media"); std::fs::create_dir_all(&media_dir_path)?; + // Write the ZIP to a temp file inside the temp dir so that a cancelled or failed + // download leaves no partial file at the final output path. On successful build() + // the temp ZIP is renamed to output_path (same filesystem → atomic rename). + let zip_temp_path = temp_dir.path().join("archive.zip"); + let zip_file = File::create(&zip_temp_path)?; + let zip = ZipWriter::new(zip_file); + // Create CSV writer for occurrence records let occurrence_file = File::create(&occurrence_file_path)?; let mut occurrence_writer = csv::WriterBuilder::new() @@ -68,6 +78,8 @@ impl ArchiveBuilder { Ok(Self { temp_dir, + zip, + output_path: output_path.to_path_buf(), occurrence_writer, multimedia_writer: None, audiovisual_writer: None, @@ -84,14 +96,34 @@ impl ArchiveBuilder { audiovisual_file_path, identification_file_path, comment_file_path, - media_dir_path, metadata, }) } - /// Get the media directory path for downloading photos - pub fn media_dir(&self) -> &std::path::Path { - &self.media_dir_path + /// Get the media staging directory path for downloading files before adding to the ZIP. + pub fn media_dir(&self) -> PathBuf { + self.temp_dir.path().join("media") + } + + /// Stream a staged media file into the open ZIP and remove it from the staging directory. + /// `rel_zip_path` is the path as it should appear in the ZIP (e.g. `"media/2024/01/15/12345.jpg"`). + /// The file must exist at `temp_dir / rel_zip_path`. + pub fn add_media_from_temp( + &mut self, + rel_zip_path: &str, + ) -> Result<(), Box> { + let local_path = self.temp_dir.path().join(rel_zip_path); + if !local_path.exists() { + return Ok(()); + } + let zip_opts: FileOptions<()> = FileOptions::default() + .compression_method(CompressionMethod::Stored) + .unix_permissions(0o644); + self.zip.start_file(rel_zip_path, zip_opts)?; + let mut file = File::open(&local_path)?; + std::io::copy(&mut file, &mut self.zip)?; + std::fs::remove_file(&local_path)?; + Ok(()) } /// Add a batch of DarwinCore occurrences to the archive @@ -237,8 +269,9 @@ impl ArchiveBuilder { Ok(()) } - /// Build the final archive and create the ZIP file - pub async fn build(mut self, output_path: &str) -> Result<(), Box> { + /// Finish writing the archive. All media must have been added via `add_media_from_temp` + /// before calling this; only CSV files and metadata are written here. + pub async fn build(mut self) -> Result<(), Box> { // Ensure all CSV data is written self.occurrence_writer.flush()?; drop(self.occurrence_writer); // Close the file @@ -277,34 +310,31 @@ impl ArchiveBuilder { let eml_file_path = self.temp_dir.path().join("eml.xml"); std::fs::write(&eml_file_path, eml_xml)?; - // Create ZIP archive - let zip_file = File::create(output_path)?; - let mut zip = ZipWriter::new(zip_file); let options: FileOptions<()> = FileOptions::default() .compression_method(CompressionMethod::Deflated) .unix_permissions(0o644); // Add meta.xml to ZIP - zip.start_file("meta.xml", options)?; + self.zip.start_file("meta.xml", options)?; let meta_content = std::fs::read(&meta_file_path)?; - zip.write_all(&meta_content)?; + self.zip.write_all(&meta_content)?; // Add eml.xml to ZIP - zip.start_file("eml.xml", options)?; + self.zip.start_file("eml.xml", options)?; let eml_content = std::fs::read(&eml_file_path)?; - zip.write_all(&eml_content)?; + self.zip.write_all(&eml_content)?; // Add chuck.json if inat_query is set if let Some(ref inat_query) = self.metadata.inat_query { let chuck_json = serde_json::json!({ "inat_query": inat_query }).to_string(); - zip.start_file("chuck.json", options)?; - zip.write_all(chuck_json.as_bytes())?; + self.zip.start_file("chuck.json", options)?; + self.zip.write_all(chuck_json.as_bytes())?; } // Add occurrence.csv to ZIP - zip.start_file("occurrence.csv", options)?; + self.zip.start_file("occurrence.csv", options)?; let occurrence_content = std::fs::read(&self.occurrence_file_path)?; - zip.write_all(&occurrence_content)?; + self.zip.write_all(&occurrence_content)?; // Add extension CSVs to ZIP for all enabled extensions, even if empty let ext_specs: &[(crate::DwcaExtension, &str, &std::path::Path, Vec<&str>)] = &[ @@ -346,75 +376,24 @@ impl ArchiveBuilder { wtr.write_record(headers)?; wtr.flush()?; } - zip.start_file(*zip_name, options)?; - zip.write_all(&std::fs::read(file_path)?)?; - } - - // Add media directory contents to ZIP if it exists and has files - println!("Zipping media..."); - Self::add_directory_to_zip(&mut zip, &self.media_dir_path, "media")?; - - // Finish ZIP - zip.finish()?; - - println!("DarwinCore Archive created: {output_path}"); - println!("Records exported: {}", self.record_count); - if self.multimedia_count > 0 { - println!("Multimedia records exported: {}", self.multimedia_count); + self.zip.start_file(*zip_name, options)?; + self.zip.write_all(&std::fs::read(file_path)?)?; } - if self.audiovisual_count > 0 { - println!("Audiovisual records exported: {}", self.audiovisual_count); - } - if self.identification_count > 0 { - println!("Identification records exported: {}", self.identification_count); - } - if self.comment_count > 0 { - println!("Comment records exported: {}", self.comment_count); - } - - Ok(()) - } - /// Add directory contents to ZIP archive using streaming to avoid loading entire files into memory - fn add_directory_to_zip( - zip: &mut ZipWriter, - dir_path: &std::path::Path, - zip_prefix: &str, - ) -> Result<(), Box> { - if !dir_path.exists() || !dir_path.is_dir() { - return Ok(()); - } + // Finish ZIP (writes central directory) + let zip_temp_path = self.temp_dir.path().join("archive.zip"); + self.zip.finish()?; - // There's no point in trying to compress JPGs - let zip_opts: FileOptions<()> = FileOptions::default() - .compression_method(CompressionMethod::Stored) - .unix_permissions(0o644); + // Rename the temp ZIP to the final output path. Both are on the same filesystem + // so this is an atomic rename on most systems. + std::fs::rename(&zip_temp_path, &self.output_path)?; - for entry in std::fs::read_dir(dir_path)? { - let entry = entry?; - let path = entry.path(); - - if path.is_file() { - let file_name = path.file_name() - .and_then(|name| name.to_str()) - .ok_or("Invalid filename")?; - - let zip_path = format!("{zip_prefix}/{file_name}"); - zip.start_file(zip_path, zip_opts)?; - - // Stream the file contents instead of reading entirely into memory - let mut file = File::open(&path)?; - std::io::copy(&mut file, zip)?; - } else if path.is_dir() { - // Recursively add subdirectory contents - let dir_name = path.file_name() - .and_then(|name| name.to_str()) - .ok_or("Invalid directory name")?; - - let subdir_zip_prefix = format!("{zip_prefix}/{dir_name}"); - Self::add_directory_to_zip(zip, &path, &subdir_zip_prefix)?; - } - } + log::info!( + "DarwinCore Archive complete: {} records, {} multimedia, {} audiovisual, \ + {} identifications, {} comments", + self.record_count, self.multimedia_count, self.audiovisual_count, + self.identification_count, self.comment_count, + ); Ok(()) } @@ -428,28 +407,53 @@ mod tests { use zip::ZipArchive; #[test] - fn test_archive_builder_temp_dir_in_base_dir() { - let base = tempfile::TempDir::new().unwrap(); - let metadata = Metadata::default(); - let builder = ArchiveBuilder::new(vec![], metadata, base.path()).unwrap(); - // media_dir lives inside temp_dir, so it must be under base + fn test_archive_temp_files_in_same_dir_as_output() { + let output_dir = tempfile::TempDir::new().unwrap(); + let output_path = output_dir.path().join("test.zip"); + let builder = ArchiveBuilder::new(vec![], Metadata::default(), &output_path).unwrap(); + // media_dir lives inside temp_dir, which must be under the output's parent dir assert!( - builder.media_dir().starts_with(base.path()), + builder.media_dir().starts_with(output_dir.path()), "expected media_dir {:?} to be under {:?}", builder.media_dir(), - base.path() + output_dir.path() ); } + #[tokio::test] + async fn test_add_media_from_temp_adds_to_zip_and_removes_local_file() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let mut builder = ArchiveBuilder::new(vec![], Metadata::default(), tmp.path()).unwrap(); + + // Create a fake media file in the staging dir + let media_dir = builder.media_dir(); + std::fs::create_dir_all(media_dir.join("2024/01/15")).unwrap(); + let staged = media_dir.join("2024/01/15/99999.jpg"); + std::fs::write(&staged, b"fake image data").unwrap(); + + builder.add_media_from_temp("media/2024/01/15/99999.jpg").unwrap(); + + // Staging file must be gone + assert!(!staged.exists(), "staged file should have been removed"); + + builder.build().await.unwrap(); + + let file = std::fs::File::open(tmp.path()).unwrap(); + let mut archive = ZipArchive::new(file).unwrap(); + let mut entry = archive.by_name("media/2024/01/15/99999.jpg") + .expect("media entry missing from zip"); + let mut contents = vec![]; + std::io::Read::read_to_end(&mut entry, &mut contents).unwrap(); + assert_eq!(contents, b"fake image data"); + } + /// Build a minimal archive with no occurrences and the given extensions enabled, /// return the list of file names present in the ZIP. async fn zip_file_names(extensions: Vec) -> Vec { - let metadata = Metadata::default(); - let builder = ArchiveBuilder::new(extensions, metadata, &std::env::temp_dir()).unwrap(); let tmp = tempfile::NamedTempFile::new().unwrap(); - let path = tmp.path().to_str().unwrap().to_string(); - builder.build(&path).await.unwrap(); - let file = std::fs::File::open(&path).unwrap(); + let builder = ArchiveBuilder::new(extensions, Metadata::default(), tmp.path()).unwrap(); + builder.build().await.unwrap(); + let file = std::fs::File::open(tmp.path()).unwrap(); let mut archive = ZipArchive::new(file).unwrap(); (0..archive.len()) .map(|i| archive.by_index(i).unwrap().name().to_string()) @@ -458,15 +462,14 @@ mod tests { #[tokio::test] async fn test_chuck_json_written_when_inat_query_present() { + let tmp = tempfile::NamedTempFile::new().unwrap(); let metadata = Metadata { inat_query: Some("taxon_id=47790".to_string()), ..Default::default() }; - let builder = ArchiveBuilder::new(vec![], metadata, &std::env::temp_dir()).unwrap(); - let tmp = tempfile::NamedTempFile::new().unwrap(); - let path = tmp.path().to_str().unwrap().to_string(); - builder.build(&path).await.unwrap(); - let file = std::fs::File::open(&path).unwrap(); + let builder = ArchiveBuilder::new(vec![], metadata, tmp.path()).unwrap(); + builder.build().await.unwrap(); + let file = std::fs::File::open(tmp.path()).unwrap(); let mut archive = ZipArchive::new(file).unwrap(); let mut chuck_file = archive.by_name("chuck.json").expect("chuck.json missing"); let mut contents = String::new(); diff --git a/chuck-core/src/downloader.rs b/chuck-core/src/downloader.rs index 7bea055..2cc6aa6 100644 --- a/chuck-core/src/downloader.rs +++ b/chuck-core/src/downloader.rs @@ -110,16 +110,12 @@ impl Downloader { use std::sync::atomic::Ordering; use crate::darwin_core::ArchiveBuilder; - // Create archive builder with temp dir on the same filesystem as the output. - // Using the output's parent avoids writing to tmpfs (a common Linux default for - // /tmp), which can exhaust RAM-backed space even when the disk has room. - let output_parent = std::path::Path::new(output_path) - .parent() - .unwrap_or(std::path::Path::new(".")); + // Pass the output path directly so the builder opens the ZIP immediately and + // places temp files on the same filesystem (avoids Linux tmpfs exhaustion). let mut archive = ArchiveBuilder::new( self.extensions.clone(), self.metadata.clone(), - output_parent, + std::path::Path::new(output_path), )?; log::info!( @@ -186,6 +182,9 @@ impl Downloader { self.process_extensions( &observations, &mut archive, &photo_mapping, &sound_mapping, &taxa_hash ).await?; + for rel_path in photo_mapping.values().chain(sound_mapping.values()) { + archive.add_media_from_temp(rel_path)?; + } } break; } @@ -240,12 +239,15 @@ impl Downloader { self.process_extensions( &observations, &mut archive, &photo_mapping, &sound_mapping, &prev_taxa_hash ).await?; + for rel_path in photo_mapping.values().chain(sound_mapping.values()) { + archive.add_media_from_temp(rel_path)?; + } } // Start media downloads for current batch in background let media_handle = self.start_media_downloads( &batch, - archive.media_dir().to_path_buf(), + archive.media_dir(), &mut progress, &progress_callback, cancellation_token.clone(), @@ -274,7 +276,7 @@ impl Downloader { ); progress.stage = DownloadStage::Building; progress_callback(progress.clone()); - archive.build(output_path).await?; + archive.build().await?; Ok(()) }