From 4db21cefbd653ec8a2abfc751529fa70b6ca4346 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 12:03:00 +0300 Subject: [PATCH 1/8] WIP: scan watch folders for orphaned files (#34) --- src/download_system/orchestration.rs | 22 +++- src/download_system/transfer.rs | 163 +++++++++++++++++++++++++++ src/http/handlers.rs | 32 +++++- src/main.rs | 9 ++ src/services/putio.rs | 2 + src/state.rs | 37 ++++++ 6 files changed, 263 insertions(+), 2 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 073f2e5..efef565 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -98,7 +98,13 @@ impl Worker { actix_rt::spawn(async { watch_for_import(app_data, tx, t).await }); } TransferMessage::Imported(t) => { - actix_rt::spawn(async { watch_seeding(app_data, t).await }); + if t.is_orphan { + // An orphan has no transfer record to remove or seed; just + // delete the file from put.io (issue #34). + actix_rt::spawn(async move { cleanup_orphan(app_data, t).await }); + } else { + actix_rt::spawn(async { watch_seeding(app_data, t).await }); + } } } } @@ -169,3 +175,17 @@ async fn watch_seeding(app_data: Data, transfer: Transfer) -> Result<() info!("{}: done seeding", transfer); Ok(()) } + +/// Cleanup for an orphaned watch-folder transfer once it's been imported: there +/// is no put.io transfer to remove and nothing to seed, so just delete the file +/// and stop reporting it to the *arr (issue #34). +async fn cleanup_orphan(app_data: Data, transfer: Transfer) -> Result<()> { + if let Some(file_id) = transfer.file_id { + match putio::delete_file(&app_data.config.putio.api_key, file_id).await { + Ok(_) => info!("{}: deleted orphan from put.io", transfer), + Err(e) => warn!("{}: failed to delete orphan from put.io: {}", transfer, e), + } + app_data.state.remove_orphan(file_id).await; + } + Ok(()) +} diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index ccad29c..a68b4bd 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -3,6 +3,7 @@ use crate::{ arr::ArrApp, putio::{self, PutIOTransfer}, }, + state::OrphanFile, AppData, }; use actix_web::web::Data; @@ -23,6 +24,11 @@ pub struct Transfer { pub transfer_id: u64, pub targets: Option>, pub app_data: Data, + /// True if this came from a watch-folder scan (an orphaned file with no + /// put.io transfer record) rather than `transfers/list`. Such transfers are + /// cleaned up by deleting the file directly, since there is no transfer to + /// remove or seeding to wait on (issue #34). + pub is_orphan: bool, } impl Transfer { @@ -102,6 +108,22 @@ impl Transfer { targets: None, hash: transfer.hash.clone(), app_data, + is_orphan: false, + } + } + + /// Builds a synthetic transfer for an orphaned watch-folder file (one with + /// no put.io transfer record). The file_id doubles as the transfer id and is + /// formatted into a deterministic synthetic hash so the *arr can track it. + pub fn from_orphan(app_data: Data, file_id: i64, name: String) -> Self { + Self { + transfer_id: file_id as u64, + name, + file_id: Some(file_id), + targets: None, + hash: Some(format!("{:040x}", file_id)), + app_data, + is_orphan: true, } } } @@ -259,6 +281,9 @@ async fn is_managed(app_data: &Data, putio_transfer: &PutIOTransfer) -> pub async fn produce_transfers(app_data: Data, tx: Sender) -> Result<()> { let putio_check_interval = std::time::Duration::from_secs(app_data.config.polling_interval); let mut seen = Vec::::new(); + // file_ids of orphaned watch-folder files we've already queued, so the scan + // doesn't re-queue them every poll. + let mut orphan_seen = Vec::::new(); info!("Checking unfinished transfers"); // We only need to check if something has been imported. Just by looking at the filesystem we @@ -331,6 +356,16 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= 60 { info!( @@ -352,3 +387,131 @@ pub async fn produce_transfers(app_data: Data, tx: Sender bool { + let b = name.as_bytes(); + let n = b.len(); + let mut i = 0; + while i + 3 < n { + if (b[i] == b'S' || b[i] == b's') && b[i + 1].is_ascii_digit() { + let mut j = i + 1; + while j < n && b[j].is_ascii_digit() { + j += 1; + } + if j + 1 < n && (b[j] == b'E' || b[j] == b'e') && b[j + 1].is_ascii_digit() { + return true; + } + } + i += 1; + } + name.to_lowercase().contains("season") +} + +/// Scans the configured `watch_folders` for orphaned completed files — files +/// with no transfer record (e.g. removed by put.io's "clear completed") that +/// `transfers/list` will never surface — and queues them for download like +/// normal transfers so they don't get stranded on put.io (issue #34). +async fn scan_watch_folders( + app_data: &Data, + tx: &Sender, + orphan_seen: &mut Vec, + active_transfers: &[PutIOTransfer], +) { + if app_data.config.watch_folders.is_empty() { + return; + } + let api_key = &app_data.config.putio.api_key; + let active_file_ids: Vec = active_transfers.iter().filter_map(|t| t.file_id).collect(); + + for folder_id in &app_data.config.watch_folders { + let resp = match putio::list_files(api_key, *folder_id).await { + Ok(r) => r, + Err(e) => { + warn!("watch folder {}: listing failed: {}", folder_id, e); + continue; + } + }; + for file in &resp.files { + // Skip the result of an active transfer (handled the normal way) and + // anything we've already queued this run. + if active_file_ids.contains(&file.id) || orphan_seen.contains(&file.id) { + continue; + } + + // Route to the matching *arr category folder based on the name. + let category = if looks_like_episode(&file.name) { + app_data + .config + .sonarr + .as_ref() + .and_then(|c| c.category.clone()) + } else { + app_data + .config + .radarr + .as_ref() + .and_then(|c| c.category.clone()) + }; + let download_dir = match &category { + Some(c) => format!("{}/{}", app_data.config.download_directory, c), + None => app_data.config.download_directory.clone(), + }; + let hash = format!("{:040x}", file.id); + if let Err(e) = app_data + .state + .add_transfer( + hash.clone(), + category.clone().unwrap_or_default(), + download_dir.clone(), + ) + .await + { + warn!("watch folder {}: storing state failed: {}", file.id, e); + } + + let mut transfer = Transfer::from_orphan(app_data.clone(), file.id, file.name.clone()); + let targets = match transfer.get_download_targets().await { + Ok(t) if !t.is_empty() => t, + Ok(_) => { + // No downloadable (video) content — ignore it. + orphan_seen.push(file.id); + continue; + } + Err(e) => { + warn!("{}: orphan target generation failed: {}", transfer, e); + continue; + } + }; + transfer.targets = Some(targets); + + if transfer.is_imported().await { + // Already imported by the *arr — just clean it off put.io. + info!("{}: orphan already imported, deleting from put.io", transfer); + let _ = putio::delete_file(api_key, file.id).await; + orphan_seen.push(file.id); + continue; + } + + app_data + .state + .add_orphan(OrphanFile { + file_id: file.id, + name: file.name.clone(), + hash, + size: file.size, + download_dir, + }) + .await; + info!("{}: orphan ready for download", transfer); + if tx + .send(TransferMessage::QueuedForDownload(transfer)) + .await + .is_ok() + { + orphan_seen.push(file.id); + } + } + } +} diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 57247bf..96a8645 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -272,9 +272,39 @@ pub(crate) async fn handle_torrent_get( tt } }); - let transmission_transfers: Vec = + let mut transmission_transfers: Vec = futures::future::join_all(transmission_transfers).await; + // Also report orphaned watch-folder files (which have no put.io transfer) as + // downloads, so the *arr imports them like any other completed download once + // putioarr has pulled them locally (issue #34). + for orphan in app_data.state.orphans().await { + let complete = app_data.state.is_local_complete(orphan.file_id as u64).await; + transmission_transfers.push(TransmissionTorrent { + id: orphan.file_id as u64, + hash_string: Some(orphan.hash), + name: orphan.name, + download_dir: orphan.download_dir, + total_size: orphan.size, + left_until_done: if complete { 0 } else { std::cmp::max(orphan.size, 1) }, + is_finished: complete, + eta: 0, + status: if complete { + TransmissionTorrentStatus::Seeding + } else { + TransmissionTorrentStatus::Downloading + }, + seconds_downloading: 0, + error_string: None, + downloaded_ever: if complete { orphan.size } else { 0 }, + seed_ratio_limit: 0.0, + seed_ratio_mode: 0, + seed_idle_limit: 0, + seed_idle_mode: 0, + file_count: 1, + }); + } + let torrents = json!(transmission_transfers); let mut arguments = serde_json::Map::new(); diff --git a/src/main.rs b/src/main.rs index 4699b26..d7b622f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,6 +63,15 @@ pub struct Config { /// lets the put.io account be shared with manual downloads. #[serde(default)] download_unmanaged: bool, + /// put.io folder ids to additionally scan for *orphaned* completed files: + /// files that were downloaded but whose transfer record no longer exists + /// (e.g. put.io's "clear completed transfers" removes the transfer while + /// leaving the file in the folder). Since putioarr normally only discovers + /// work from `transfers/list`, such files would never be pulled. Any video + /// file here with no active transfer that isn't already imported is pulled + /// like a normal download (see issue #34). Empty (default) disables this. + #[serde(default)] + watch_folders: Vec, putio: PutioConfig, sonarr: Option, radarr: Option, diff --git a/src/services/putio.rs b/src/services/putio.rs index 91c1000..715ad9a 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -187,6 +187,8 @@ pub struct FileResponse { pub id: i64, pub name: String, pub file_type: String, + #[serde(default)] + pub size: i64, } pub async fn list_files(api_token: &str, file_id: i64) -> Result { diff --git a/src/state.rs b/src/state.rs index 6ee24fb..2c14540 100644 --- a/src/state.rs +++ b/src/state.rs @@ -18,6 +18,18 @@ pub struct TransferState { pub download_dir: String, } +/// A completed file found in a `watch_folders` folder that has no transfer +/// record (e.g. put.io cleared the transfer but left the file). It's reported +/// to the *arr like a normal download so it can still be imported (issue #34). +#[derive(Debug, Clone)] +pub struct OrphanFile { + pub file_id: i64, + pub name: String, + pub hash: String, + pub size: i64, + pub download_dir: String, +} + /// Tracks the category/download-dir chosen for each transfer. /// /// Reads are served from an in-memory cache for speed, while mutations are @@ -42,6 +54,10 @@ pub struct StateManager { /// 404); without this, every torrent-get would re-hit the API and re-log a /// warning. Retries are suppressed until [`Self::NAME_FAILURE_TTL`] passes. failed_names: Arc>>, + /// Orphaned watch-folder files (no transfer record) currently being pulled, + /// keyed by file_id. Reported to the *arr via torrent-get so they import + /// like normal downloads (issue #34). + orphans: Arc>>, } impl StateManager { @@ -52,9 +68,30 @@ impl StateManager { local_complete: Arc::new(RwLock::new(HashSet::new())), file_names: Arc::new(RwLock::new(HashMap::new())), failed_names: Arc::new(RwLock::new(HashMap::new())), + orphans: Arc::new(RwLock::new(HashMap::new())), } } + /// Records an orphaned watch-folder file that is being pulled. + pub async fn add_orphan(&self, orphan: OrphanFile) { + self.orphans.write().await.insert(orphan.file_id, orphan); + } + + /// True if `file_id` is already tracked as an orphan being pulled. + pub async fn has_orphan(&self, file_id: i64) -> bool { + self.orphans.read().await.contains_key(&file_id) + } + + /// Stops tracking an orphan (e.g. once it has been imported and removed). + pub async fn remove_orphan(&self, file_id: i64) { + self.orphans.write().await.remove(&file_id); + } + + /// All orphaned files currently being pulled, for reporting to the *arr. + pub async fn orphans(&self) -> Vec { + self.orphans.read().await.values().cloned().collect() + } + /// How long to suppress retrying a failed file-name lookup. const NAME_FAILURE_TTL: Duration = Duration::from_secs(600); From e0044fcc566c7f96d84dbbc43cd31cf055a5dded Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 12:31:26 +0300 Subject: [PATCH 2/8] Keep download workers alive on per-transfer errors (#34 root cause) --- src/download_system/download.rs | 7 +- src/download_system/orchestration.rs | 95 ++++++++++++++++------------ 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/src/download_system/download.rs b/src/download_system/download.rs index dda5736..ed52ce1 100644 --- a/src/download_system/download.rs +++ b/src/download_system/download.rs @@ -37,7 +37,12 @@ impl Worker { Ok(_) => DownloadDoneStatus::Success, Err(_) => DownloadDoneStatus::Failed, }; - dtm.tx.send(done_status).await?; + // Reporting status can fail if the orchestration worker that queued + // this target is gone. Don't let that take this worker down too — + // log and keep serving other downloads (issue #34). + if let Err(e) = dtm.tx.send(done_status).await { + warn!("download worker: could not report status (receiver gone): {e}"); + } } } } diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index efef565..f7c3cbb 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -49,48 +49,13 @@ impl Worker { let app_data = self.app_data.clone(); match msg { TransferMessage::QueuedForDownload(t) => { - info!("{}: download {}", t, "started".yellow()); - let targets = t.get_download_targets().await?; - // Create a communications channel for the download worker to communicate status back. - let done_channels: &Vec<( - Sender, - Receiver, - )> = &targets.iter().map(|_| async_channel::unbounded()).collect(); - - for (i, target) in targets.iter().enumerate() { - let (done_tx, _) = done_channels[i].clone(); - self.dtx - .send(DownloadTargetMessage { - download_target: target.clone(), - tx: done_tx, - }) - .await?; - } - - // Wait for all the workers having sent back their status. - let mut all_downloaded = vec![]; - for (_, done_rx) in done_channels { - all_downloaded.push(done_rx.recv().await?); - } - - // Check if all are success - if all_downloaded.iter().all(|d| match d { - DownloadDoneStatus::Success => true, - DownloadDoneStatus::Failed => false, - }) { - info!("{}: download {}", t, "done".blue()); - // The files now exist locally, so it's safe to report - // this transfer as complete to the *arr (see issue #16). - self.app_data.state.mark_local_complete(t.transfer_id).await; - self.tx - .send(TransferMessage::Downloaded(Transfer { - targets: Some(targets), - ..t - })) - .await?; - } else { - // TODO: figure out what to do here.. - warn!("{}: not all targets downloaded", t) + // Handle each transfer in a way that can't take the worker + // down: a `?` here used to bubble up and end work(), which + // (silently) killed the worker and, via the dropped done + // channels, cascaded to the others until everything stalled + // (issue #34). Log and carry on instead. + if let Err(e) = self.handle_queued(t).await { + warn!("download orchestration error (worker continuing): {}", e); } } TransferMessage::Downloaded(t) => { @@ -109,6 +74,52 @@ impl Worker { } } } + + /// Downloads all of a transfer's targets and, on full success, marks it + /// complete and forwards it for import. Returns Err on any failure so the + /// caller can log it without ending the worker (see issue #34). + async fn handle_queued(&self, t: Transfer) -> Result<()> { + info!("{}: download {}", t, "started".yellow()); + let targets = t.get_download_targets().await?; + // A status channel per target for the download workers to report back. + let done_channels: Vec<(Sender, Receiver)> = + targets.iter().map(|_| async_channel::unbounded()).collect(); + + for (i, target) in targets.iter().enumerate() { + let (done_tx, _) = done_channels[i].clone(); + self.dtx + .send(DownloadTargetMessage { + download_target: target.clone(), + tx: done_tx, + }) + .await?; + } + + // Wait for all the workers having sent back their status. + let mut all_downloaded = vec![]; + for (_, done_rx) in &done_channels { + all_downloaded.push(done_rx.recv().await?); + } + + if all_downloaded + .iter() + .all(|d| matches!(d, DownloadDoneStatus::Success)) + { + info!("{}: download {}", t, "done".blue()); + // The files now exist locally, so it's safe to report this transfer + // as complete to the *arr (see issue #16). + self.app_data.state.mark_local_complete(t.transfer_id).await; + self.tx + .send(TransferMessage::Downloaded(Transfer { + targets: Some(targets), + ..t + })) + .await?; + } else { + warn!("{}: not all targets downloaded", t); + } + Ok(()) + } } async fn watch_for_import( From 4901a002ae5612bba90cfd8a70651aec6b8cec1f Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 12:59:12 +0300 Subject: [PATCH 3/8] Clean up imported orphans from put.io directly in watch_for_import (#34) The Imported-message route to cleanup ran on an orchestration worker, which can be permanently busy in handle_queued while downloads are active, so orphan put.io deletes never fired. Do the delete in the watch_for_import task itself (which already deletes the local copy). --- src/download_system/orchestration.rs | 42 +++++++++++++--------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index f7c3cbb..d773123 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -63,13 +63,7 @@ impl Worker { actix_rt::spawn(async { watch_for_import(app_data, tx, t).await }); } TransferMessage::Imported(t) => { - if t.is_orphan { - // An orphan has no transfer record to remove or seed; just - // delete the file from put.io (issue #34). - actix_rt::spawn(async move { cleanup_orphan(app_data, t).await }); - } else { - actix_rt::spawn(async { watch_seeding(app_data, t).await }); - } + actix_rt::spawn(async { watch_seeding(app_data, t).await }); } } } @@ -146,8 +140,24 @@ async fn watch_for_import( panic!("{}: no idea how to handle", &top_level_target) } }; - let m = transfer.clone(); - tx.send(TransferMessage::Imported(m)).await?; + // An orphan has no put.io transfer to remove or seed, so finish it + // here directly instead of routing an Imported message through a + // worker (which may be busy downloading and never pick it up), + // deleting the now-imported file from put.io (issue #34). + if transfer.is_orphan { + if let Some(file_id) = transfer.file_id { + match putio::delete_file(&app_data.config.putio.api_key, file_id).await { + Ok(_) => info!("{}: deleted orphan from put.io", transfer), + Err(e) => { + warn!("{}: failed to delete orphan from put.io: {}", transfer, e) + } + } + app_data.state.remove_orphan(file_id).await; + } + } else { + let m = transfer.clone(); + tx.send(TransferMessage::Imported(m)).await?; + } break; } @@ -186,17 +196,3 @@ async fn watch_seeding(app_data: Data, transfer: Transfer) -> Result<() info!("{}: done seeding", transfer); Ok(()) } - -/// Cleanup for an orphaned watch-folder transfer once it's been imported: there -/// is no put.io transfer to remove and nothing to seed, so just delete the file -/// and stop reporting it to the *arr (issue #34). -async fn cleanup_orphan(app_data: Data, transfer: Transfer) -> Result<()> { - if let Some(file_id) = transfer.file_id { - match putio::delete_file(&app_data.config.putio.api_key, file_id).await { - Ok(_) => info!("{}: deleted orphan from put.io", transfer), - Err(e) => warn!("{}: failed to delete orphan from put.io: {}", transfer, e), - } - app_data.state.remove_orphan(file_id).await; - } - Ok(()) -} From 8b2fab5a4d396fd651c26304ad53f8e84eae12a6 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 13:14:55 +0300 Subject: [PATCH 4/8] Address review: empty-targets guard, HashSet lookups, orphan reporting - handle_queued: skip transfers with no targets instead of vacuously marking them complete - scan: O(1) HashSet membership for active_file_ids/orphan_seen - synthetic hash from u64 so it stays a clean 40-hex value - log the folder id (not the file id) on a state-save failure - only add_orphan after the queue send succeeds - torrent-get: keep orphan total_size/left_until_done consistent --- src/download_system/orchestration.rs | 6 ++++ src/download_system/transfer.rs | 46 ++++++++++++++++------------ src/http/handlers.rs | 9 ++++-- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index d773123..55ca924 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -75,6 +75,12 @@ impl Worker { async fn handle_queued(&self, t: Transfer) -> Result<()> { info!("{}: download {}", t, "started".yellow()); let targets = t.get_download_targets().await?; + // No targets means nothing to download; don't let the `all(...)` check + // below pass vacuously and mark the transfer complete. + if targets.is_empty() { + warn!("{}: no downloadable targets, skipping", t); + return Ok(()); + } // A status channel per target for the download workers to report back. let done_channels: Vec<(Sender, Receiver)> = targets.iter().map(|_| async_channel::unbounded()).collect(); diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index a68b4bd..4691d27 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -13,7 +13,7 @@ use async_recursion::async_recursion; use colored::*; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; -use std::{fmt::Display, path::Path}; +use std::{collections::HashSet, fmt::Display, path::Path}; use tokio::time::sleep; #[derive(Clone)] @@ -121,7 +121,7 @@ impl Transfer { name, file_id: Some(file_id), targets: None, - hash: Some(format!("{:040x}", file_id)), + hash: Some(format!("{:040x}", file_id as u64)), app_data, is_orphan: true, } @@ -283,7 +283,7 @@ pub async fn produce_transfers(app_data: Data, tx: Sender::new(); // file_ids of orphaned watch-folder files we've already queued, so the scan // doesn't re-queue them every poll. - let mut orphan_seen = Vec::::new(); + let mut orphan_seen = HashSet::::new(); info!("Checking unfinished transfers"); // We only need to check if something has been imported. Just by looking at the filesystem we @@ -416,14 +416,14 @@ fn looks_like_episode(name: &str) -> bool { async fn scan_watch_folders( app_data: &Data, tx: &Sender, - orphan_seen: &mut Vec, + orphan_seen: &mut HashSet, active_transfers: &[PutIOTransfer], ) { if app_data.config.watch_folders.is_empty() { return; } let api_key = &app_data.config.putio.api_key; - let active_file_ids: Vec = active_transfers.iter().filter_map(|t| t.file_id).collect(); + let active_file_ids: HashSet = active_transfers.iter().filter_map(|t| t.file_id).collect(); for folder_id in &app_data.config.watch_folders { let resp = match putio::list_files(api_key, *folder_id).await { @@ -458,7 +458,7 @@ async fn scan_watch_folders( Some(c) => format!("{}/{}", app_data.config.download_directory, c), None => app_data.config.download_directory.clone(), }; - let hash = format!("{:040x}", file.id); + let hash = format!("{:040x}", file.id as u64); if let Err(e) = app_data .state .add_transfer( @@ -468,7 +468,10 @@ async fn scan_watch_folders( ) .await { - warn!("watch folder {}: storing state failed: {}", file.id, e); + warn!( + "watch folder {}: storing state for file {} failed: {}", + folder_id, file.id, e + ); } let mut transfer = Transfer::from_orphan(app_data.clone(), file.id, file.name.clone()); @@ -476,7 +479,7 @@ async fn scan_watch_folders( Ok(t) if !t.is_empty() => t, Ok(_) => { // No downloadable (video) content — ignore it. - orphan_seen.push(file.id); + orphan_seen.insert(file.id); continue; } Err(e) => { @@ -490,27 +493,30 @@ async fn scan_watch_folders( // Already imported by the *arr — just clean it off put.io. info!("{}: orphan already imported, deleting from put.io", transfer); let _ = putio::delete_file(api_key, file.id).await; - orphan_seen.push(file.id); + orphan_seen.insert(file.id); continue; } - app_data - .state - .add_orphan(OrphanFile { - file_id: file.id, - name: file.name.clone(), - hash, - size: file.size, - download_dir, - }) - .await; info!("{}: orphan ready for download", transfer); + // Only start tracking/reporting the orphan once it's actually been + // queued, so a failed send can't leave it advertised via torrent-get + // as a download that never happens. if tx .send(TransferMessage::QueuedForDownload(transfer)) .await .is_ok() { - orphan_seen.push(file.id); + app_data + .state + .add_orphan(OrphanFile { + file_id: file.id, + name: file.name.clone(), + hash, + size: file.size, + download_dir, + }) + .await; + orphan_seen.insert(file.id); } } } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 96a8645..82f8540 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -280,13 +280,16 @@ pub(crate) async fn handle_torrent_get( // putioarr has pulled them locally (issue #34). for orphan in app_data.state.orphans().await { let complete = app_data.state.is_local_complete(orphan.file_id as u64).await; + // Keep size/left consistent: never report left_until_done > total_size + // (size can be 0 if put.io omitted it). + let size = orphan.size.max(0); transmission_transfers.push(TransmissionTorrent { id: orphan.file_id as u64, hash_string: Some(orphan.hash), name: orphan.name, download_dir: orphan.download_dir, - total_size: orphan.size, - left_until_done: if complete { 0 } else { std::cmp::max(orphan.size, 1) }, + total_size: size, + left_until_done: if complete { 0 } else { size }, is_finished: complete, eta: 0, status: if complete { @@ -296,7 +299,7 @@ pub(crate) async fn handle_torrent_get( }, seconds_downloading: 0, error_string: None, - downloaded_ever: if complete { orphan.size } else { 0 }, + downloaded_ever: if complete { size } else { 0 }, seed_ratio_limit: 0.0, seed_ratio_mode: 0, seed_idle_limit: 0, From 056d5d49c482d741d2cacd33ed7d3302c492f97d Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 13:32:30 +0300 Subject: [PATCH 5/8] Address review round 2: orphan tracking, retry, routing, alloc - Track in-progress orphans via state.has_orphan instead of an ever-growing orphan_seen set; a failed orphan is dropped from it (orchestration) so a later scan retries it - Route orphans with an explicit base dir (get_download_targets_in) instead of persisting per-orphan TransferState that never got cleaned - Skip non-media / negative-id watch-folder entries up front - torrent-get: checked u64::try_from for the orphan id - looks_like_episode: case-insensitive 'season' check without allocating --- src/download_system/orchestration.rs | 7 +++ src/download_system/transfer.rs | 74 ++++++++++++++-------------- src/http/handlers.rs | 8 ++- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 55ca924..8ac8b6c 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -117,6 +117,13 @@ impl Worker { .await?; } else { warn!("{}: not all targets downloaded", t); + // Drop a failed orphan from tracking so a later watch-folder scan + // can retry it instead of it being suppressed forever (issue #34). + if t.is_orphan { + if let Some(file_id) = t.file_id { + self.app_data.state.remove_orphan(file_id).await; + } + } } Ok(()) } diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index 4691d27..c5ebba0 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -89,6 +89,23 @@ impl Transfer { recurse_download_targets(&self.app_data, self.file_id.unwrap(), hash, None, true).await } + /// Like [`get_download_targets`] but with an explicit base directory instead + /// of looking one up from stored transfer state. Used for orphaned files, + /// which have no persisted state to route them (issue #34). + pub async fn get_download_targets_in(&self, base_path: &str) -> Result> { + info!("{}: generating targets", self); + let default = "0000".to_string(); + let hash = self.hash.as_ref().unwrap_or(&default).as_str(); + recurse_download_targets( + &self.app_data, + self.file_id.unwrap(), + hash, + Some(base_path.to_string()), + true, + ) + .await + } + pub fn get_top_level(&self) -> DownloadTarget { self.targets .clone() @@ -281,9 +298,6 @@ async fn is_managed(app_data: &Data, putio_transfer: &PutIOTransfer) -> pub async fn produce_transfers(app_data: Data, tx: Sender) -> Result<()> { let putio_check_interval = std::time::Duration::from_secs(app_data.config.polling_interval); let mut seen = Vec::::new(); - // file_ids of orphaned watch-folder files we've already queued, so the scan - // doesn't re-queue them every poll. - let mut orphan_seen = HashSet::::new(); info!("Checking unfinished transfers"); // We only need to check if something has been imported. Just by looking at the filesystem we @@ -358,13 +372,7 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= 60 { @@ -406,7 +414,10 @@ fn looks_like_episode(name: &str) -> bool { } i += 1; } - name.to_lowercase().contains("season") + // Case-insensitive "season" check without allocating a lowercased copy. + name.as_bytes() + .windows(6) + .any(|w| w.eq_ignore_ascii_case(b"season")) } /// Scans the configured `watch_folders` for orphaned completed files — files @@ -416,7 +427,6 @@ fn looks_like_episode(name: &str) -> bool { async fn scan_watch_folders( app_data: &Data, tx: &Sender, - orphan_seen: &mut HashSet, active_transfers: &[PutIOTransfer], ) { if app_data.config.watch_folders.is_empty() { @@ -434,13 +444,21 @@ async fn scan_watch_folders( } }; for file in &resp.files { + // Only media (and folders that may contain media); skip stray + // images/nfos and anything with an unusable (negative) id. + if file.id < 0 || !matches!(file.file_type.as_str(), "FOLDER" | "VIDEO" | "AUDIO") { + continue; + } // Skip the result of an active transfer (handled the normal way) and - // anything we've already queued this run. - if active_file_ids.contains(&file.id) || orphan_seen.contains(&file.id) { + // any orphan we're already pulling. Using `has_orphan` as the + // "in progress" marker keeps tracking bounded and, since a failed + // orphan is dropped from it, lets a later poll retry it. + if active_file_ids.contains(&file.id) || app_data.state.has_orphan(file.id).await { continue; } - // Route to the matching *arr category folder based on the name. + // Route to the matching *arr category folder based on the name. The + // base dir is passed explicitly so orphans need no persisted state. let category = if looks_like_episode(&file.name) { app_data .config @@ -458,30 +476,11 @@ async fn scan_watch_folders( Some(c) => format!("{}/{}", app_data.config.download_directory, c), None => app_data.config.download_directory.clone(), }; - let hash = format!("{:040x}", file.id as u64); - if let Err(e) = app_data - .state - .add_transfer( - hash.clone(), - category.clone().unwrap_or_default(), - download_dir.clone(), - ) - .await - { - warn!( - "watch folder {}: storing state for file {} failed: {}", - folder_id, file.id, e - ); - } let mut transfer = Transfer::from_orphan(app_data.clone(), file.id, file.name.clone()); - let targets = match transfer.get_download_targets().await { + let targets = match transfer.get_download_targets_in(&download_dir).await { Ok(t) if !t.is_empty() => t, - Ok(_) => { - // No downloadable (video) content — ignore it. - orphan_seen.insert(file.id); - continue; - } + Ok(_) => continue, // no downloadable (video) content Err(e) => { warn!("{}: orphan target generation failed: {}", transfer, e); continue; @@ -493,11 +492,11 @@ async fn scan_watch_folders( // Already imported by the *arr — just clean it off put.io. info!("{}: orphan already imported, deleting from put.io", transfer); let _ = putio::delete_file(api_key, file.id).await; - orphan_seen.insert(file.id); continue; } info!("{}: orphan ready for download", transfer); + let hash = format!("{:040x}", file.id as u64); // Only start tracking/reporting the orphan once it's actually been // queued, so a failed send can't leave it advertised via torrent-get // as a download that never happens. @@ -516,7 +515,6 @@ async fn scan_watch_folders( download_dir, }) .await; - orphan_seen.insert(file.id); } } } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 82f8540..379d7ef 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -279,7 +279,13 @@ pub(crate) async fn handle_torrent_get( // downloads, so the *arr imports them like any other completed download once // putioarr has pulled them locally (issue #34). for orphan in app_data.state.orphans().await { - let complete = app_data.state.is_local_complete(orphan.file_id as u64).await; + // put.io file ids are non-negative; guard the i64->u64 conversion so a + // bad value can't wrap into a wrong id / local-complete key. + let id = match u64::try_from(orphan.file_id) { + Ok(id) => id, + Err(_) => continue, + }; + let complete = app_data.state.is_local_complete(id).await; // Keep size/left consistent: never report left_until_done > total_size // (size can be 0 if put.io omitted it). let size = orphan.size.max(0); From 15f3859506ecfa8e63b77193ab6483a8e66225c7 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 13:46:40 +0300 Subject: [PATCH 6/8] Address review round 3: orphan routing, id reuse, scan throttle, logs - handle_queued reuses precomputed t.targets when present so watch-folder orphans download to their intended category dir (get_download_targets_in) instead of being mis-routed by a stateless get_download_targets() - torrent-get uses the already-validated u64 id for the struct field too - throttle watch-folder scans to a 60s interval instead of every poll - log (not swallow) delete_file failures in the already-imported path - debug_assert the non-negative file_id invariant in from_orphan --- src/download_system/orchestration.rs | 10 ++++++++- src/download_system/transfer.rs | 32 +++++++++++++++++++++++----- src/http/handlers.rs | 2 +- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 8ac8b6c..9f5a2c6 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -74,7 +74,15 @@ impl Worker { /// caller can log it without ending the worker (see issue #34). async fn handle_queued(&self, t: Transfer) -> Result<()> { info!("{}: download {}", t, "started".yellow()); - let targets = t.get_download_targets().await?; + // Reuse targets computed when the transfer was discovered if present — + // watch-folder orphans precompute them with the correct base dir + // (get_download_targets_in), which a plain get_download_targets() here + // would not reproduce (no persisted state) and would mis-route. Only + // regenerate for normal transfers, which don't carry them. + let targets = match t.targets.clone() { + Some(ts) => ts, + None => t.get_download_targets().await?, + }; // No targets means nothing to download; don't let the `all(...)` check // below pass vacuously and mark the transfer complete. if targets.is_empty() { diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index c5ebba0..ad8678e 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -13,7 +13,12 @@ use async_recursion::async_recursion; use colored::*; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, fmt::Display, path::Path}; +use std::{ + collections::HashSet, + fmt::Display, + path::Path, + time::{Duration, Instant}, +}; use tokio::time::sleep; #[derive(Clone)] @@ -133,6 +138,10 @@ impl Transfer { /// no put.io transfer record). The file_id doubles as the transfer id and is /// formatted into a deterministic synthetic hash so the *arr can track it. pub fn from_orphan(app_data: Data, file_id: i64, name: String) -> Self { + // put.io file ids are non-negative; the watch-folder scan filters out + // any that aren't before constructing an orphan, so the `as u64` casts + // below are exact. Assert it to catch future misuse. + debug_assert!(file_id >= 0, "orphan file_id must be non-negative"); Self { transfer_id: file_id as u64, name, @@ -296,8 +305,12 @@ async fn is_managed(app_data: &Data, putio_transfer: &PutIOTransfer) -> } pub async fn produce_transfers(app_data: Data, tx: Sender) -> Result<()> { - let putio_check_interval = std::time::Duration::from_secs(app_data.config.polling_interval); + let putio_check_interval = Duration::from_secs(app_data.config.polling_interval); let mut seen = Vec::::new(); + // Watch-folder scans hit put.io once per folder, so run them on their own + // interval rather than every poll to avoid extra API traffic (issue #34). + const ORPHAN_SCAN_INTERVAL: Duration = Duration::from_secs(60); + let mut last_orphan_scan: Option = None; info!("Checking unfinished transfers"); // We only need to check if something has been imported. Just by looking at the filesystem we @@ -371,8 +384,12 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= ORPHAN_SCAN_INTERVAL) { + scan_watch_folders(&app_data, &tx, &list_transfer_response.transfers).await; + last_orphan_scan = Some(Instant::now()); + } // Log status when 60 seconds have passed since last time if start.elapsed().as_secs() >= 60 { @@ -491,7 +508,12 @@ async fn scan_watch_folders( if transfer.is_imported().await { // Already imported by the *arr — just clean it off put.io. info!("{}: orphan already imported, deleting from put.io", transfer); - let _ = putio::delete_file(api_key, file.id).await; + if let Err(e) = putio::delete_file(api_key, file.id).await { + warn!( + "{}: failed to delete imported orphan from put.io: {}", + transfer, e + ); + } continue; } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 379d7ef..9b88a87 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -290,7 +290,7 @@ pub(crate) async fn handle_torrent_get( // (size can be 0 if put.io omitted it). let size = orphan.size.max(0); transmission_transfers.push(TransmissionTorrent { - id: orphan.file_id as u64, + id, hash_string: Some(orphan.hash), name: orphan.name, download_dir: orphan.download_dir, From 1f7141bfd20e672ef6792f66dca72e3627022fa8 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 14:02:16 +0300 Subject: [PATCH 7/8] Address review round 4: orphan progress, fallible targets, config interval - torrent-get: report a non-zero remaining amount for an incomplete orphan even when put.io omitted its size, so 0/0 isn't read as complete - get_download_targets[_in]: return an error instead of panicking when a transfer has no file_id (shared generate_targets helper) - make the watch-folder scan interval configurable via watch_folder_interval_secs (default 60), documented in the template --- src/download_system/transfer.rs | 28 ++++++++++++---------------- src/http/handlers.rs | 17 +++++++++++++---- src/main.rs | 11 +++++++++++ src/utils.rs | 12 ++++++++++++ 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index ad8678e..f7d1f58 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -7,7 +7,7 @@ use crate::{ AppData, }; use actix_web::web::Data; -use anyhow::Result; +use anyhow::{Context, Result}; use async_channel::Sender; use async_recursion::async_recursion; use colored::*; @@ -88,27 +88,22 @@ impl Transfer { } pub async fn get_download_targets(&self) -> Result> { - info!("{}: generating targets", self); - let default = "0000".to_string(); - let hash = self.hash.as_ref().unwrap_or(&default).as_str(); - recurse_download_targets(&self.app_data, self.file_id.unwrap(), hash, None, true).await + self.generate_targets(None).await } /// Like [`get_download_targets`] but with an explicit base directory instead /// of looking one up from stored transfer state. Used for orphaned files, /// which have no persisted state to route them (issue #34). pub async fn get_download_targets_in(&self, base_path: &str) -> Result> { + self.generate_targets(Some(base_path.to_string())).await + } + + async fn generate_targets(&self, base_path: Option) -> Result> { info!("{}: generating targets", self); + let file_id = self.file_id.context("transfer has no file_id")?; let default = "0000".to_string(); let hash = self.hash.as_ref().unwrap_or(&default).as_str(); - recurse_download_targets( - &self.app_data, - self.file_id.unwrap(), - hash, - Some(base_path.to_string()), - true, - ) - .await + recurse_download_targets(&self.app_data, file_id, hash, base_path, true).await } pub fn get_top_level(&self) -> DownloadTarget { @@ -308,8 +303,9 @@ pub async fn produce_transfers(app_data: Data, tx: Sender::new(); // Watch-folder scans hit put.io once per folder, so run them on their own - // interval rather than every poll to avoid extra API traffic (issue #34). - const ORPHAN_SCAN_INTERVAL: Duration = Duration::from_secs(60); + // configurable interval rather than every poll to avoid extra API traffic + // (issue #34). + let orphan_scan_interval = Duration::from_secs(app_data.config.watch_folder_interval_secs); let mut last_orphan_scan: Option = None; info!("Checking unfinished transfers"); @@ -386,7 +382,7 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= ORPHAN_SCAN_INTERVAL) { + if last_orphan_scan.map_or(true, |t| t.elapsed() >= orphan_scan_interval) { scan_watch_folders(&app_data, &tx, &list_transfer_response.transfers).await; last_orphan_scan = Some(Instant::now()); } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 9b88a87..66f4b00 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -286,16 +286,25 @@ pub(crate) async fn handle_torrent_get( Err(_) => continue, }; let complete = app_data.state.is_local_complete(id).await; - // Keep size/left consistent: never report left_until_done > total_size - // (size can be 0 if put.io omitted it). + // Report consistent size/progress. Keep left_until_done <= total_size, + // and when incomplete report a non-zero amount remaining even if the + // size is unknown (put.io omitted it) so a client can't read 0/0 as + // "done" while it's still downloading. let size = orphan.size.max(0); + let (total_size, left_until_done) = if complete { + (size, 0) + } else if size > 0 { + (size, size) + } else { + (1, 1) + }; transmission_transfers.push(TransmissionTorrent { id, hash_string: Some(orphan.hash), name: orphan.name, download_dir: orphan.download_dir, - total_size: size, - left_until_done: if complete { 0 } else { size }, + total_size, + left_until_done, is_finished: complete, eta: 0, status: if complete { diff --git a/src/main.rs b/src/main.rs index d7b622f..0321655 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,10 @@ struct RunArgs { pub config_path: String, } +fn default_watch_folder_interval_secs() -> u64 { + 60 +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { bind_address: String, @@ -72,6 +76,13 @@ pub struct Config { /// like a normal download (see issue #34). Empty (default) disables this. #[serde(default)] watch_folders: Vec, + /// How often, in seconds, to scan `watch_folders` for orphaned files. + /// Defaults to 60. Each scan lists every configured folder on put.io, so + /// raise this if you have many folders and want to keep API traffic low; + /// it's independent of `polling_interval`. Only used when `watch_folders` + /// is non-empty. + #[serde(default = "default_watch_folder_interval_secs")] + watch_folder_interval_secs: u64, putio: PutioConfig, sonarr: Option, radarr: Option, diff --git a/src/utils.rs b/src/utils.rs index 594e6d6..7f080cf 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -38,6 +38,18 @@ skip_directories = ["sample", "extras"] # transfers. Set to true to download every transfer on the account. download_unmanaged = false +# Optional. put.io folder ids to scan for *orphaned* completed files: files that +# were downloaded but whose transfer record no longer exists (e.g. put.io's +# "clear completed transfers" removes the transfer but leaves the file). Such +# files are never surfaced by transfers/list, so they'd otherwise be stranded. +# Any video file here with no active transfer that isn't already imported is +# pulled like a normal download. Empty (default) disables this. +# watch_folders = [123456789] + +# Optional. How often (seconds) to scan watch_folders, default 60. Each scan +# lists every configured folder on put.io; raise it to reduce API traffic. +# watch_folder_interval_secs = 60 + # Optional number of orchestration workers, default 10. Unless there are many changes coming from # put.io, you shouldn't have to touch this number. 10 is already overkill. orchestration_workers = 10 From a954b39b2219ae85b02c0bd1dcec0d16176628b8 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 14:14:39 +0300 Subject: [PATCH 8/8] Address review round 5: list timeouts, interval clamp, hash dedup - add request timeouts to putio::list_files and putio::url so a hung connection during a watch-folder scan can't stall transfer production - clamp watch_folder_interval_secs to >= 1s so a misconfigured 0 doesn't become a zero Duration that scans every poll - derive OrphanFile.hash from the transfer's hash instead of formatting file_id a second time (single source of truth) - clone only the Sender, not the unused Receiver, per done channel --- src/download_system/orchestration.rs | 2 +- src/download_system/transfer.rs | 10 ++++++++-- src/services/putio.rs | 2 ++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 9f5a2c6..60051aa 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -94,7 +94,7 @@ impl Worker { targets.iter().map(|_| async_channel::unbounded()).collect(); for (i, target) in targets.iter().enumerate() { - let (done_tx, _) = done_channels[i].clone(); + let done_tx = done_channels[i].0.clone(); self.dtx .send(DownloadTargetMessage { download_target: target.clone(), diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index f7d1f58..f280980 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -305,7 +305,11 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= 1s so a misconfigured 0 doesn't become a zero Duration that + // scans on every poll. The effective floor is the polling interval anyway, + // since the scan only runs inside this loop. + let orphan_scan_interval = + Duration::from_secs(app_data.config.watch_folder_interval_secs.max(1)); let mut last_orphan_scan: Option = None; info!("Checking unfinished transfers"); @@ -514,7 +518,9 @@ async fn scan_watch_folders( } info!("{}: orphan ready for download", transfer); - let hash = format!("{:040x}", file.id as u64); + // Reuse the hash already derived by `from_orphan` so there's a single + // source of truth for the synthetic hash. + let hash = transfer.hash.clone().unwrap_or_default(); // Only start tracking/reporting the orphan once it's actually been // queued, so a failed send can't leave it advertised via torrent-get // as a download that never happens. diff --git a/src/services/putio.rs b/src/services/putio.rs index 715ad9a..1d8d2b0 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -198,6 +198,7 @@ pub async fn list_files(api_token: &str, file_id: i64) -> Result Result { let client = reqwest::Client::new(); let response = client .get(format!("https://api.put.io/v2/files/{}/url", file_id)) + .timeout(Duration::from_secs(30)) .header("authorization", format!("Bearer {}", api_token)) .send() .await?;