diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 073f2e5..f36708d 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -11,7 +11,10 @@ use anyhow::Result; use async_channel::{Receiver, Sender}; use colored::*; use log::{info, warn}; -use std::{fs, time::Duration}; +use std::{ + fs, + time::{Duration, Instant}, +}; use tokio::{fs::metadata, time::sleep}; use super::transfer::TransferMessage; @@ -111,6 +114,12 @@ async fn watch_for_import( transfer: Transfer, ) -> Result<()> { info!("{}: watching imports", transfer); + // Give up watching after this long so a transfer that never fully imports + // (e.g. one containing a sample the *arr won't import) can't loop forever + // and accumulate until downloads stall (see issue #30). Genuine imports are + // detected within a poll or two, so the default is generous. + let import_timeout = Duration::from_secs(app_data.config.import_timeout_secs); + let started = Instant::now(); loop { if transfer.is_imported().await { info!("{}: imported", transfer); @@ -132,11 +141,20 @@ async fn watch_for_import( let m = transfer.clone(); tx.send(TransferMessage::Imported(m)).await?; + info!("{}: removed", transfer); + break; + } + if app_data.config.import_timeout_secs > 0 && started.elapsed() > import_timeout { + warn!( + "{}: still not imported after {:?}; giving up watching. It likely contains a \ + file the *arr won't import (e.g. a sample outside a skip_directories folder), \ + so its local/put.io copies won't be cleaned up automatically.", + transfer, import_timeout + ); break; } sleep(Duration::from_secs(app_data.config.polling_interval)).await; } - info!("{}: removed", transfer); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 4699b26..9669ce5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,12 @@ struct RunArgs { pub config_path: String, } +/// Default for [`Config::import_timeout_secs`] (2h), enforced at the type level +/// so the documented default holds even without the Figment default layer. +fn default_import_timeout_secs() -> u64 { + 7200 +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { bind_address: String, @@ -52,6 +58,13 @@ pub struct Config { orchestration_workers: usize, password: String, polling_interval: u64, + /// How long (seconds) to keep polling for a transfer to be imported before + /// giving up watching it. Bounds the per-transfer import-watch loop so a + /// transfer that never fully imports (e.g. one with a sample the *arr won't + /// import) can't accumulate and stall downloads. Default 7200 (2h); 0 + /// disables the bound (watch indefinitely). + #[serde(default = "default_import_timeout_secs")] + import_timeout_secs: u64, port: u16, skip_directories: Vec, uid: u32, @@ -145,6 +158,7 @@ async fn main() -> Result<()> { .join(Serialized::default("orchestration_workers", 10)) .join(Serialized::default("loglevel", "info")) .join(Serialized::default("polling_interval", 10)) + .join(Serialized::default("import_timeout_secs", 7200u64)) .join(Serialized::default("port", 9091)) .join(Serialized::default("uid", 1000)) .join(Serialized::default("download_unmanaged", false)) diff --git a/src/utils.rs b/src/utils.rs index 594e6d6..a82b062 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -29,6 +29,12 @@ uid = 1000 # Optional polling interval in secs, default 10. polling_interval = 10 +# Optional. Give up watching a transfer for import after this many seconds, default 7200 (2h). +# Bounds the per-transfer import-watch loop so a transfer that never fully imports (e.g. one +# containing a sample the *arr won't import) can't accumulate and eventually stall downloads. +# Set to 0 to disable (watch indefinitely). +import_timeout_secs = 7200 + # Optional skip directories when downloading, default ["sample", "extras"] skip_directories = ["sample", "extras"]