From ff225f93cff4cdaefb55b2a0a3f5b728d1cf72d5 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 10:28:02 +0300 Subject: [PATCH 1/4] Bound watch_for_import so stuck transfers can't stall downloads (fixes #30) watch_for_import polled is_imported() in an unbounded loop that only exits once every video file is imported. A transfer containing a file the *arr never imports (typically a sample outside a skip_directories folder) makes is_imported() permanently false, so the task loops forever. These accumulate over time until the process stops picking up new transfers entirely (downloads stall; a restart clears it). Give up watching after MAX_IMPORT_WAIT (2h) with a warning, so a stuck transfer ends its task instead of polling the *arr/put.io APIs forever. Genuine imports complete within a poll or two, so the bound doesn't affect normal operation. A better long-term fix is improved import mapping (the skip_directories TODO), noted in the issue. --- src/download_system/orchestration.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 073f2e5..09c8dc6 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -11,11 +11,22 @@ use anyhow::Result; use async_channel::{Receiver, Sender}; use colored::*; use log::{info, warn}; -use std::{fs, time::Duration}; +use std::{ + fs, + time::{Duration, Instant}, +}; use tokio::{fs::metadata, time::sleep}; use super::transfer::TransferMessage; +/// How long to keep polling for a transfer to be imported before giving up. +/// `is_imported()` requires *every* video file to be imported, but a transfer +/// can contain a file the *arr will never import (e.g. a sample outside a +/// `skip_directories` folder), which would otherwise make `watch_for_import` +/// loop forever and accumulate until the process stalls (see issue #30). +/// Genuine imports complete within a poll or two, so this bound is generous. +const MAX_IMPORT_WAIT: Duration = Duration::from_secs(2 * 60 * 60); + #[derive(Clone)] pub struct Worker { _id: usize, @@ -111,6 +122,7 @@ async fn watch_for_import( transfer: Transfer, ) -> Result<()> { info!("{}: watching imports", transfer); + let started = Instant::now(); loop { if transfer.is_imported().await { info!("{}: imported", transfer); @@ -134,6 +146,15 @@ async fn watch_for_import( break; } + if started.elapsed() > MAX_IMPORT_WAIT { + warn!( + "{}: still not imported after {:?}; giving up watching. It likely contains a \ + file the *arr won't import (e.g. a sample outside a skip_directories folder), \ + so its local/put.io copies won't be cleaned up automatically.", + transfer, MAX_IMPORT_WAIT + ); + break; + } sleep(Duration::from_secs(app_data.config.polling_interval)).await; } info!("{}: removed", transfer); From 95bc3225f58f857d54ea607a90c2eb0a878d99ff Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 10:40:19 +0300 Subject: [PATCH 2/4] Only log 'removed' when the transfer was actually cleaned up The unconditional 'removed' log fired on the give-up path too, even though the warning there says nothing was cleaned up. Move it into the imported branch so the log matches what happened. --- src/download_system/orchestration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 09c8dc6..9e261a9 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -144,6 +144,7 @@ async fn watch_for_import( let m = transfer.clone(); tx.send(TransferMessage::Imported(m)).await?; + info!("{}: removed", transfer); break; } if started.elapsed() > MAX_IMPORT_WAIT { @@ -157,7 +158,6 @@ async fn watch_for_import( } sleep(Duration::from_secs(app_data.config.polling_interval)).await; } - info!("{}: removed", transfer); Ok(()) } From 9645afb77483fde63d19965bf0c8f582ab637295 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 10:47:42 +0300 Subject: [PATCH 3/4] Make the import-watch give-up timeout configurable The give-up timeout was a hard-coded 2h. The right value depends on library size, IO speed and *arr load, so expose it as import_timeout_secs (default 7200). Setting it to 0 disables the bound (watch indefinitely). --- src/download_system/orchestration.rs | 17 +++++++---------- src/main.rs | 7 +++++++ src/utils.rs | 6 ++++++ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 9e261a9..f36708d 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -19,14 +19,6 @@ use tokio::{fs::metadata, time::sleep}; use super::transfer::TransferMessage; -/// How long to keep polling for a transfer to be imported before giving up. -/// `is_imported()` requires *every* video file to be imported, but a transfer -/// can contain a file the *arr will never import (e.g. a sample outside a -/// `skip_directories` folder), which would otherwise make `watch_for_import` -/// loop forever and accumulate until the process stalls (see issue #30). -/// Genuine imports complete within a poll or two, so this bound is generous. -const MAX_IMPORT_WAIT: Duration = Duration::from_secs(2 * 60 * 60); - #[derive(Clone)] pub struct Worker { _id: usize, @@ -122,6 +114,11 @@ async fn watch_for_import( transfer: Transfer, ) -> Result<()> { info!("{}: watching imports", transfer); + // Give up watching after this long so a transfer that never fully imports + // (e.g. one containing a sample the *arr won't import) can't loop forever + // and accumulate until downloads stall (see issue #30). Genuine imports are + // detected within a poll or two, so the default is generous. + let import_timeout = Duration::from_secs(app_data.config.import_timeout_secs); let started = Instant::now(); loop { if transfer.is_imported().await { @@ -147,12 +144,12 @@ async fn watch_for_import( info!("{}: removed", transfer); break; } - if started.elapsed() > MAX_IMPORT_WAIT { + if app_data.config.import_timeout_secs > 0 && started.elapsed() > import_timeout { warn!( "{}: still not imported after {:?}; giving up watching. It likely contains a \ file the *arr won't import (e.g. a sample outside a skip_directories folder), \ so its local/put.io copies won't be cleaned up automatically.", - transfer, MAX_IMPORT_WAIT + transfer, import_timeout ); break; } diff --git a/src/main.rs b/src/main.rs index 4699b26..6a17d9c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,12 @@ pub struct Config { orchestration_workers: usize, password: String, polling_interval: u64, + /// How long (seconds) to keep polling for a transfer to be imported before + /// giving up watching it. Bounds the per-transfer import-watch loop so a + /// transfer that never fully imports (e.g. one with a sample the *arr won't + /// import) can't accumulate and stall downloads. Default 7200 (2h). + #[serde(default)] + import_timeout_secs: u64, port: u16, skip_directories: Vec, uid: u32, @@ -145,6 +151,7 @@ async fn main() -> Result<()> { .join(Serialized::default("orchestration_workers", 10)) .join(Serialized::default("loglevel", "info")) .join(Serialized::default("polling_interval", 10)) + .join(Serialized::default("import_timeout_secs", 7200u64)) .join(Serialized::default("port", 9091)) .join(Serialized::default("uid", 1000)) .join(Serialized::default("download_unmanaged", false)) diff --git a/src/utils.rs b/src/utils.rs index 594e6d6..a82b062 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -29,6 +29,12 @@ uid = 1000 # Optional polling interval in secs, default 10. polling_interval = 10 +# Optional. Give up watching a transfer for import after this many seconds, default 7200 (2h). +# Bounds the per-transfer import-watch loop so a transfer that never fully imports (e.g. one +# containing a sample the *arr won't import) can't accumulate and eventually stall downloads. +# Set to 0 to disable (watch indefinitely). +import_timeout_secs = 7200 + # Optional skip directories when downloading, default ["sample", "extras"] skip_directories = ["sample", "extras"] From 94206d0efc89d776f3d8939274b57fb56ecc598e Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 11:00:29 +0300 Subject: [PATCH 4/4] Enforce the import_timeout_secs default (7200) at the type level Use #[serde(default = ...)] returning 7200 instead of #[serde(default)], which would default to 0 (disabling the bound) if the field is missing from a config source without the Figment default layer. --- src/main.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6a17d9c..9669ce5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,12 @@ struct RunArgs { pub config_path: String, } +/// Default for [`Config::import_timeout_secs`] (2h), enforced at the type level +/// so the documented default holds even without the Figment default layer. +fn default_import_timeout_secs() -> u64 { + 7200 +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { bind_address: String, @@ -55,8 +61,9 @@ pub struct Config { /// How long (seconds) to keep polling for a transfer to be imported before /// giving up watching it. Bounds the per-transfer import-watch loop so a /// transfer that never fully imports (e.g. one with a sample the *arr won't - /// import) can't accumulate and stall downloads. Default 7200 (2h). - #[serde(default)] + /// import) can't accumulate and stall downloads. Default 7200 (2h); 0 + /// disables the bound (watch indefinitely). + #[serde(default = "default_import_timeout_secs")] import_timeout_secs: u64, port: u16, skip_directories: Vec,