diff --git a/src/download_system/download.rs b/src/download_system/download.rs index dda5736..ed52ce1 100644 --- a/src/download_system/download.rs +++ b/src/download_system/download.rs @@ -37,7 +37,12 @@ impl Worker { Ok(_) => DownloadDoneStatus::Success, Err(_) => DownloadDoneStatus::Failed, }; - dtm.tx.send(done_status).await?; + // Reporting status can fail if the orchestration worker that queued + // this target is gone. Don't let that take this worker down too — + // log and keep serving other downloads (issue #34). + if let Err(e) = dtm.tx.send(done_status).await { + warn!("download worker: could not report status (receiver gone): {e}"); + } } } } diff --git a/src/download_system/orchestration.rs b/src/download_system/orchestration.rs index 073f2e5..60051aa 100644 --- a/src/download_system/orchestration.rs +++ b/src/download_system/orchestration.rs @@ -49,48 +49,13 @@ impl Worker { let app_data = self.app_data.clone(); match msg { TransferMessage::QueuedForDownload(t) => { - info!("{}: download {}", t, "started".yellow()); - let targets = t.get_download_targets().await?; - // Create a communications channel for the download worker to communicate status back. - let done_channels: &Vec<( - Sender, - Receiver, - )> = &targets.iter().map(|_| async_channel::unbounded()).collect(); - - for (i, target) in targets.iter().enumerate() { - let (done_tx, _) = done_channels[i].clone(); - self.dtx - .send(DownloadTargetMessage { - download_target: target.clone(), - tx: done_tx, - }) - .await?; - } - - // Wait for all the workers having sent back their status. - let mut all_downloaded = vec![]; - for (_, done_rx) in done_channels { - all_downloaded.push(done_rx.recv().await?); - } - - // Check if all are success - if all_downloaded.iter().all(|d| match d { - DownloadDoneStatus::Success => true, - DownloadDoneStatus::Failed => false, - }) { - info!("{}: download {}", t, "done".blue()); - // The files now exist locally, so it's safe to report - // this transfer as complete to the *arr (see issue #16). - self.app_data.state.mark_local_complete(t.transfer_id).await; - self.tx - .send(TransferMessage::Downloaded(Transfer { - targets: Some(targets), - ..t - })) - .await?; - } else { - // TODO: figure out what to do here.. - warn!("{}: not all targets downloaded", t) + // Handle each transfer in a way that can't take the worker + // down: a `?` here used to bubble up and end work(), which + // (silently) killed the worker and, via the dropped done + // channels, cascaded to the others until everything stalled + // (issue #34). Log and carry on instead. + if let Err(e) = self.handle_queued(t).await { + warn!("download orchestration error (worker continuing): {}", e); } } TransferMessage::Downloaded(t) => { @@ -103,6 +68,73 @@ impl Worker { } } } + + /// Downloads all of a transfer's targets and, on full success, marks it + /// complete and forwards it for import. Returns Err on any failure so the + /// caller can log it without ending the worker (see issue #34). + async fn handle_queued(&self, t: Transfer) -> Result<()> { + info!("{}: download {}", t, "started".yellow()); + // Reuse targets computed when the transfer was discovered if present — + // watch-folder orphans precompute them with the correct base dir + // (get_download_targets_in), which a plain get_download_targets() here + // would not reproduce (no persisted state) and would mis-route. Only + // regenerate for normal transfers, which don't carry them. + let targets = match t.targets.clone() { + Some(ts) => ts, + None => t.get_download_targets().await?, + }; + // No targets means nothing to download; don't let the `all(...)` check + // below pass vacuously and mark the transfer complete. + if targets.is_empty() { + warn!("{}: no downloadable targets, skipping", t); + return Ok(()); + } + // A status channel per target for the download workers to report back. + let done_channels: Vec<(Sender, Receiver)> = + targets.iter().map(|_| async_channel::unbounded()).collect(); + + for (i, target) in targets.iter().enumerate() { + let done_tx = done_channels[i].0.clone(); + self.dtx + .send(DownloadTargetMessage { + download_target: target.clone(), + tx: done_tx, + }) + .await?; + } + + // Wait for all the workers having sent back their status. + let mut all_downloaded = vec![]; + for (_, done_rx) in &done_channels { + all_downloaded.push(done_rx.recv().await?); + } + + if all_downloaded + .iter() + .all(|d| matches!(d, DownloadDoneStatus::Success)) + { + info!("{}: download {}", t, "done".blue()); + // The files now exist locally, so it's safe to report this transfer + // as complete to the *arr (see issue #16). + self.app_data.state.mark_local_complete(t.transfer_id).await; + self.tx + .send(TransferMessage::Downloaded(Transfer { + targets: Some(targets), + ..t + })) + .await?; + } else { + warn!("{}: not all targets downloaded", t); + // Drop a failed orphan from tracking so a later watch-folder scan + // can retry it instead of it being suppressed forever (issue #34). + if t.is_orphan { + if let Some(file_id) = t.file_id { + self.app_data.state.remove_orphan(file_id).await; + } + } + } + Ok(()) + } } async fn watch_for_import( @@ -129,8 +161,24 @@ async fn watch_for_import( panic!("{}: no idea how to handle", &top_level_target) } }; - let m = transfer.clone(); - tx.send(TransferMessage::Imported(m)).await?; + // An orphan has no put.io transfer to remove or seed, so finish it + // here directly instead of routing an Imported message through a + // worker (which may be busy downloading and never pick it up), + // deleting the now-imported file from put.io (issue #34). + if transfer.is_orphan { + if let Some(file_id) = transfer.file_id { + match putio::delete_file(&app_data.config.putio.api_key, file_id).await { + Ok(_) => info!("{}: deleted orphan from put.io", transfer), + Err(e) => { + warn!("{}: failed to delete orphan from put.io: {}", transfer, e) + } + } + app_data.state.remove_orphan(file_id).await; + } + } else { + let m = transfer.clone(); + tx.send(TransferMessage::Imported(m)).await?; + } break; } diff --git a/src/download_system/transfer.rs b/src/download_system/transfer.rs index ccad29c..f280980 100644 --- a/src/download_system/transfer.rs +++ b/src/download_system/transfer.rs @@ -3,16 +3,22 @@ use crate::{ arr::ArrApp, putio::{self, PutIOTransfer}, }, + state::OrphanFile, AppData, }; use actix_web::web::Data; -use anyhow::Result; +use anyhow::{Context, Result}; use async_channel::Sender; use async_recursion::async_recursion; use colored::*; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; -use std::{fmt::Display, path::Path}; +use std::{ + collections::HashSet, + fmt::Display, + path::Path, + time::{Duration, Instant}, +}; use tokio::time::sleep; #[derive(Clone)] @@ -23,6 +29,11 @@ pub struct Transfer { pub transfer_id: u64, pub targets: Option>, pub app_data: Data, + /// True if this came from a watch-folder scan (an orphaned file with no + /// put.io transfer record) rather than `transfers/list`. Such transfers are + /// cleaned up by deleting the file directly, since there is no transfer to + /// remove or seeding to wait on (issue #34). + pub is_orphan: bool, } impl Transfer { @@ -77,10 +88,22 @@ impl Transfer { } pub async fn get_download_targets(&self) -> Result> { + self.generate_targets(None).await + } + + /// Like [`get_download_targets`] but with an explicit base directory instead + /// of looking one up from stored transfer state. Used for orphaned files, + /// which have no persisted state to route them (issue #34). + pub async fn get_download_targets_in(&self, base_path: &str) -> Result> { + self.generate_targets(Some(base_path.to_string())).await + } + + async fn generate_targets(&self, base_path: Option) -> Result> { info!("{}: generating targets", self); + let file_id = self.file_id.context("transfer has no file_id")?; let default = "0000".to_string(); let hash = self.hash.as_ref().unwrap_or(&default).as_str(); - recurse_download_targets(&self.app_data, self.file_id.unwrap(), hash, None, true).await + recurse_download_targets(&self.app_data, file_id, hash, base_path, true).await } pub fn get_top_level(&self) -> DownloadTarget { @@ -102,6 +125,26 @@ impl Transfer { targets: None, hash: transfer.hash.clone(), app_data, + is_orphan: false, + } + } + + /// Builds a synthetic transfer for an orphaned watch-folder file (one with + /// no put.io transfer record). The file_id doubles as the transfer id and is + /// formatted into a deterministic synthetic hash so the *arr can track it. + pub fn from_orphan(app_data: Data, file_id: i64, name: String) -> Self { + // put.io file ids are non-negative; the watch-folder scan filters out + // any that aren't before constructing an orphan, so the `as u64` casts + // below are exact. Assert it to catch future misuse. + debug_assert!(file_id >= 0, "orphan file_id must be non-negative"); + Self { + transfer_id: file_id as u64, + name, + file_id: Some(file_id), + targets: None, + hash: Some(format!("{:040x}", file_id as u64)), + app_data, + is_orphan: true, } } } @@ -257,8 +300,17 @@ async fn is_managed(app_data: &Data, putio_transfer: &PutIOTransfer) -> } pub async fn produce_transfers(app_data: Data, tx: Sender) -> Result<()> { - let putio_check_interval = std::time::Duration::from_secs(app_data.config.polling_interval); + let putio_check_interval = Duration::from_secs(app_data.config.polling_interval); let mut seen = Vec::::new(); + // Watch-folder scans hit put.io once per folder, so run them on their own + // configurable interval rather than every poll to avoid extra API traffic + // (issue #34). + // Clamp to >= 1s so a misconfigured 0 doesn't become a zero Duration that + // scans on every poll. The effective floor is the polling interval anyway, + // since the scan only runs inside this loop. + let orphan_scan_interval = + Duration::from_secs(app_data.config.watch_folder_interval_secs.max(1)); + let mut last_orphan_scan: Option = None; info!("Checking unfinished transfers"); // We only need to check if something has been imported. Just by looking at the filesystem we @@ -331,6 +383,14 @@ pub async fn produce_transfers(app_data: Data, tx: Sender= orphan_scan_interval) { + scan_watch_folders(&app_data, &tx, &list_transfer_response.transfers).await; + last_orphan_scan = Some(Instant::now()); + } + // Log status when 60 seconds have passed since last time if start.elapsed().as_secs() >= 60 { info!( @@ -352,3 +412,134 @@ pub async fn produce_transfers(app_data: Data, tx: Sender bool { + let b = name.as_bytes(); + let n = b.len(); + let mut i = 0; + while i + 3 < n { + if (b[i] == b'S' || b[i] == b's') && b[i + 1].is_ascii_digit() { + let mut j = i + 1; + while j < n && b[j].is_ascii_digit() { + j += 1; + } + if j + 1 < n && (b[j] == b'E' || b[j] == b'e') && b[j + 1].is_ascii_digit() { + return true; + } + } + i += 1; + } + // Case-insensitive "season" check without allocating a lowercased copy. + name.as_bytes() + .windows(6) + .any(|w| w.eq_ignore_ascii_case(b"season")) +} + +/// Scans the configured `watch_folders` for orphaned completed files — files +/// with no transfer record (e.g. removed by put.io's "clear completed") that +/// `transfers/list` will never surface — and queues them for download like +/// normal transfers so they don't get stranded on put.io (issue #34). +async fn scan_watch_folders( + app_data: &Data, + tx: &Sender, + active_transfers: &[PutIOTransfer], +) { + if app_data.config.watch_folders.is_empty() { + return; + } + let api_key = &app_data.config.putio.api_key; + let active_file_ids: HashSet = active_transfers.iter().filter_map(|t| t.file_id).collect(); + + for folder_id in &app_data.config.watch_folders { + let resp = match putio::list_files(api_key, *folder_id).await { + Ok(r) => r, + Err(e) => { + warn!("watch folder {}: listing failed: {}", folder_id, e); + continue; + } + }; + for file in &resp.files { + // Only media (and folders that may contain media); skip stray + // images/nfos and anything with an unusable (negative) id. + if file.id < 0 || !matches!(file.file_type.as_str(), "FOLDER" | "VIDEO" | "AUDIO") { + continue; + } + // Skip the result of an active transfer (handled the normal way) and + // any orphan we're already pulling. Using `has_orphan` as the + // "in progress" marker keeps tracking bounded and, since a failed + // orphan is dropped from it, lets a later poll retry it. + if active_file_ids.contains(&file.id) || app_data.state.has_orphan(file.id).await { + continue; + } + + // Route to the matching *arr category folder based on the name. The + // base dir is passed explicitly so orphans need no persisted state. + let category = if looks_like_episode(&file.name) { + app_data + .config + .sonarr + .as_ref() + .and_then(|c| c.category.clone()) + } else { + app_data + .config + .radarr + .as_ref() + .and_then(|c| c.category.clone()) + }; + let download_dir = match &category { + Some(c) => format!("{}/{}", app_data.config.download_directory, c), + None => app_data.config.download_directory.clone(), + }; + + let mut transfer = Transfer::from_orphan(app_data.clone(), file.id, file.name.clone()); + let targets = match transfer.get_download_targets_in(&download_dir).await { + Ok(t) if !t.is_empty() => t, + Ok(_) => continue, // no downloadable (video) content + Err(e) => { + warn!("{}: orphan target generation failed: {}", transfer, e); + continue; + } + }; + transfer.targets = Some(targets); + + if transfer.is_imported().await { + // Already imported by the *arr — just clean it off put.io. + info!("{}: orphan already imported, deleting from put.io", transfer); + if let Err(e) = putio::delete_file(api_key, file.id).await { + warn!( + "{}: failed to delete imported orphan from put.io: {}", + transfer, e + ); + } + continue; + } + + info!("{}: orphan ready for download", transfer); + // Reuse the hash already derived by `from_orphan` so there's a single + // source of truth for the synthetic hash. + let hash = transfer.hash.clone().unwrap_or_default(); + // Only start tracking/reporting the orphan once it's actually been + // queued, so a failed send can't leave it advertised via torrent-get + // as a download that never happens. + if tx + .send(TransferMessage::QueuedForDownload(transfer)) + .await + .is_ok() + { + app_data + .state + .add_orphan(OrphanFile { + file_id: file.id, + name: file.name.clone(), + hash, + size: file.size, + download_dir, + }) + .await; + } + } + } +} diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 57247bf..66f4b00 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -272,9 +272,57 @@ pub(crate) async fn handle_torrent_get( tt } }); - let transmission_transfers: Vec = + let mut transmission_transfers: Vec = futures::future::join_all(transmission_transfers).await; + // Also report orphaned watch-folder files (which have no put.io transfer) as + // downloads, so the *arr imports them like any other completed download once + // putioarr has pulled them locally (issue #34). + for orphan in app_data.state.orphans().await { + // put.io file ids are non-negative; guard the i64->u64 conversion so a + // bad value can't wrap into a wrong id / local-complete key. + let id = match u64::try_from(orphan.file_id) { + Ok(id) => id, + Err(_) => continue, + }; + let complete = app_data.state.is_local_complete(id).await; + // Report consistent size/progress. Keep left_until_done <= total_size, + // and when incomplete report a non-zero amount remaining even if the + // size is unknown (put.io omitted it) so a client can't read 0/0 as + // "done" while it's still downloading. + let size = orphan.size.max(0); + let (total_size, left_until_done) = if complete { + (size, 0) + } else if size > 0 { + (size, size) + } else { + (1, 1) + }; + transmission_transfers.push(TransmissionTorrent { + id, + hash_string: Some(orphan.hash), + name: orphan.name, + download_dir: orphan.download_dir, + total_size, + left_until_done, + is_finished: complete, + eta: 0, + status: if complete { + TransmissionTorrentStatus::Seeding + } else { + TransmissionTorrentStatus::Downloading + }, + seconds_downloading: 0, + error_string: None, + downloaded_ever: if complete { size } else { 0 }, + seed_ratio_limit: 0.0, + seed_ratio_mode: 0, + seed_idle_limit: 0, + seed_idle_mode: 0, + file_count: 1, + }); + } + let torrents = json!(transmission_transfers); let mut arguments = serde_json::Map::new(); diff --git a/src/main.rs b/src/main.rs index 4699b26..0321655 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,10 @@ struct RunArgs { pub config_path: String, } +fn default_watch_folder_interval_secs() -> u64 { + 60 +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { bind_address: String, @@ -63,6 +67,22 @@ pub struct Config { /// lets the put.io account be shared with manual downloads. #[serde(default)] download_unmanaged: bool, + /// put.io folder ids to additionally scan for *orphaned* completed files: + /// files that were downloaded but whose transfer record no longer exists + /// (e.g. put.io's "clear completed transfers" removes the transfer while + /// leaving the file in the folder). Since putioarr normally only discovers + /// work from `transfers/list`, such files would never be pulled. Any video + /// file here with no active transfer that isn't already imported is pulled + /// like a normal download (see issue #34). Empty (default) disables this. + #[serde(default)] + watch_folders: Vec, + /// How often, in seconds, to scan `watch_folders` for orphaned files. + /// Defaults to 60. Each scan lists every configured folder on put.io, so + /// raise this if you have many folders and want to keep API traffic low; + /// it's independent of `polling_interval`. Only used when `watch_folders` + /// is non-empty. + #[serde(default = "default_watch_folder_interval_secs")] + watch_folder_interval_secs: u64, putio: PutioConfig, sonarr: Option, radarr: Option, diff --git a/src/services/putio.rs b/src/services/putio.rs index 91c1000..1d8d2b0 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -187,6 +187,8 @@ pub struct FileResponse { pub id: i64, pub name: String, pub file_type: String, + #[serde(default)] + pub size: i64, } pub async fn list_files(api_token: &str, file_id: i64) -> Result { @@ -196,6 +198,7 @@ pub async fn list_files(api_token: &str, file_id: i64) -> Result Result { let client = reqwest::Client::new(); let response = client .get(format!("https://api.put.io/v2/files/{}/url", file_id)) + .timeout(Duration::from_secs(30)) .header("authorization", format!("Bearer {}", api_token)) .send() .await?; diff --git a/src/state.rs b/src/state.rs index 6ee24fb..2c14540 100644 --- a/src/state.rs +++ b/src/state.rs @@ -18,6 +18,18 @@ pub struct TransferState { pub download_dir: String, } +/// A completed file found in a `watch_folders` folder that has no transfer +/// record (e.g. put.io cleared the transfer but left the file). It's reported +/// to the *arr like a normal download so it can still be imported (issue #34). +#[derive(Debug, Clone)] +pub struct OrphanFile { + pub file_id: i64, + pub name: String, + pub hash: String, + pub size: i64, + pub download_dir: String, +} + /// Tracks the category/download-dir chosen for each transfer. /// /// Reads are served from an in-memory cache for speed, while mutations are @@ -42,6 +54,10 @@ pub struct StateManager { /// 404); without this, every torrent-get would re-hit the API and re-log a /// warning. Retries are suppressed until [`Self::NAME_FAILURE_TTL`] passes. failed_names: Arc>>, + /// Orphaned watch-folder files (no transfer record) currently being pulled, + /// keyed by file_id. Reported to the *arr via torrent-get so they import + /// like normal downloads (issue #34). + orphans: Arc>>, } impl StateManager { @@ -52,9 +68,30 @@ impl StateManager { local_complete: Arc::new(RwLock::new(HashSet::new())), file_names: Arc::new(RwLock::new(HashMap::new())), failed_names: Arc::new(RwLock::new(HashMap::new())), + orphans: Arc::new(RwLock::new(HashMap::new())), } } + /// Records an orphaned watch-folder file that is being pulled. + pub async fn add_orphan(&self, orphan: OrphanFile) { + self.orphans.write().await.insert(orphan.file_id, orphan); + } + + /// True if `file_id` is already tracked as an orphan being pulled. + pub async fn has_orphan(&self, file_id: i64) -> bool { + self.orphans.read().await.contains_key(&file_id) + } + + /// Stops tracking an orphan (e.g. once it has been imported and removed). + pub async fn remove_orphan(&self, file_id: i64) { + self.orphans.write().await.remove(&file_id); + } + + /// All orphaned files currently being pulled, for reporting to the *arr. + pub async fn orphans(&self) -> Vec { + self.orphans.read().await.values().cloned().collect() + } + /// How long to suppress retrying a failed file-name lookup. const NAME_FAILURE_TTL: Duration = Duration::from_secs(600); diff --git a/src/utils.rs b/src/utils.rs index 594e6d6..7f080cf 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -38,6 +38,18 @@ skip_directories = ["sample", "extras"] # transfers. Set to true to download every transfer on the account. download_unmanaged = false +# Optional. put.io folder ids to scan for *orphaned* completed files: files that +# were downloaded but whose transfer record no longer exists (e.g. put.io's +# "clear completed transfers" removes the transfer but leaves the file). Such +# files are never surfaced by transfers/list, so they'd otherwise be stranded. +# Any video file here with no active transfer that isn't already imported is +# pulled like a normal download. Empty (default) disables this. +# watch_folders = [123456789] + +# Optional. How often (seconds) to scan watch_folders, default 60. Each scan +# lists every configured folder on put.io; raise it to reduce API traffic. +# watch_folder_interval_secs = 60 + # Optional number of orchestration workers, default 10. Unless there are many changes coming from # put.io, you shouldn't have to touch this number. 10 is already overkill. orchestration_workers = 10