Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/download_system/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use anyhow::Result;
use async_channel::{Receiver, Sender};
use colored::*;
use log::{info, warn};
use std::{fs, time::Duration};
use std::{
fs,
time::{Duration, Instant},
};
use tokio::{fs::metadata, time::sleep};

use super::transfer::TransferMessage;
Expand Down Expand Up @@ -111,6 +114,12 @@ async fn watch_for_import(
transfer: Transfer,
) -> Result<()> {
info!("{}: watching imports", transfer);
// Give up watching after this long so a transfer that never fully imports
// (e.g. one containing a sample the *arr won't import) can't loop forever
// and accumulate until downloads stall (see issue #30). Genuine imports are
// detected within a poll or two, so the default is generous.
let import_timeout = Duration::from_secs(app_data.config.import_timeout_secs);
let started = Instant::now();
loop {
Comment thread
bugrax marked this conversation as resolved.
if transfer.is_imported().await {
info!("{}: imported", transfer);
Expand All @@ -132,11 +141,20 @@ async fn watch_for_import(
let m = transfer.clone();
tx.send(TransferMessage::Imported(m)).await?;

info!("{}: removed", transfer);
break;
}
if app_data.config.import_timeout_secs > 0 && started.elapsed() > import_timeout {
warn!(
"{}: still not imported after {:?}; giving up watching. It likely contains a \
file the *arr won't import (e.g. a sample outside a skip_directories folder), \
so its local/put.io copies won't be cleaned up automatically.",
transfer, import_timeout
);
break;
}
sleep(Duration::from_secs(app_data.config.polling_interval)).await;
Comment thread
bugrax marked this conversation as resolved.
}
info!("{}: removed", transfer);
Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ struct RunArgs {
pub config_path: String,
}

/// Default for [`Config::import_timeout_secs`] (2h), enforced at the type level
/// so the documented default holds even without the Figment default layer.
fn default_import_timeout_secs() -> u64 {
7200
}
Comment thread
bugrax marked this conversation as resolved.

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config {
bind_address: String,
Expand All @@ -52,6 +58,13 @@ pub struct Config {
orchestration_workers: usize,
password: String,
polling_interval: u64,
/// How long (seconds) to keep polling for a transfer to be imported before
/// giving up watching it. Bounds the per-transfer import-watch loop so a
/// transfer that never fully imports (e.g. one with a sample the *arr won't
/// import) can't accumulate and stall downloads. Default 7200 (2h); 0
/// disables the bound (watch indefinitely).
#[serde(default = "default_import_timeout_secs")]
import_timeout_secs: u64,
Comment thread
bugrax marked this conversation as resolved.
port: u16,
skip_directories: Vec<String>,
uid: u32,
Expand Down Expand Up @@ -145,6 +158,7 @@ async fn main() -> Result<()> {
.join(Serialized::default("orchestration_workers", 10))
.join(Serialized::default("loglevel", "info"))
.join(Serialized::default("polling_interval", 10))
.join(Serialized::default("import_timeout_secs", 7200u64))
.join(Serialized::default("port", 9091))
.join(Serialized::default("uid", 1000))
.join(Serialized::default("download_unmanaged", false))
Expand Down
6 changes: 6 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ uid = 1000
# Optional polling interval in secs, default 10.
polling_interval = 10

# Optional. Give up watching a transfer for import after this many seconds, default 7200 (2h).
# Bounds the per-transfer import-watch loop so a transfer that never fully imports (e.g. one
# containing a sample the *arr won't import) can't accumulate and eventually stall downloads.
# Set to 0 to disable (watch indefinitely).
import_timeout_secs = 7200

# Optional skip directories when downloading, default ["sample", "extras"]
skip_directories = ["sample", "extras"]

Expand Down