diff --git a/src/download_system/download.rs b/src/download_system/download.rs index dda5736..0849bd1 100644 --- a/src/download_system/download.rs +++ b/src/download_system/download.rs @@ -8,8 +8,18 @@ use file_owner::PathExt; use futures::StreamExt; use log::{error, info, warn}; use nix::unistd::Uid; +use std::time::Duration; use std::{fs, path::Path}; +/// How long to wait for a download request to start returning a response (the +/// connect + response-headers phase). Bounds it so a server that accepts the +/// connection but then stalls can't hang the worker; the retry loop resumes. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(60); + +/// How long to wait for the next chunk of a download stream before treating it +/// as stalled (and erroring so the retry loop resumes). +const STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(60); + #[derive(Clone)] pub struct Worker { _id: usize, @@ -129,7 +139,20 @@ async fn fetch_attempt( if existing > 0 { req = req.header(reqwest::header::RANGE, format!("bytes={}-", existing)); } - let response = req.send().await?; + // Bound the request itself, not just the connect. put.io can accept the + // connection and then stall before sending response headers; without this + // timeout `send()` blocks forever, parking the download worker (and, via the + // blocked done channel, every orchestration worker) until the whole process + // stops pulling. On timeout we error so the retry loop can resume (issue #32). + let response = + match tokio::time::timeout(REQUEST_TIMEOUT, req.send()).await { + Ok(r) => r?, + Err(_) => bail!( + "timed out after {:?} waiting for response headers from {}", + REQUEST_TIMEOUT, + target.from.as_deref().unwrap_or("") + ), + }; let status = response.status(); if !(status.is_success() || status == reqwest::StatusCode::PARTIAL_CONTENT) { bail!("HTTP {}", status); @@ -146,12 +169,12 @@ async fn fetch_attempt( let mut byte_stream = response.bytes_stream(); loop { - match tokio::time::timeout(std::time::Duration::from_secs(60), byte_stream.next()).await { + match tokio::time::timeout(STREAM_IDLE_TIMEOUT, byte_stream.next()).await { Ok(Some(item)) => { tokio::io::copy(&mut item?.as_ref(), &mut tmp_file).await?; } Ok(None) => break, - Err(_) => bail!("stalled: no data received for 60s"), + Err(_) => bail!("stalled: no data received for {:?}", STREAM_IDLE_TIMEOUT), } } Ok(()) diff --git a/src/services/putio.rs b/src/services/putio.rs index 91c1000..3635d47 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -1,7 +1,22 @@ use anyhow::{bail, Result}; use reqwest::multipart; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::OnceLock, time::Duration}; + +/// Shared, connection-pooled HTTP client for all put.io API calls. Building a +/// fresh `reqwest::Client` per request (as the code did before) means no +/// connection reuse; under load — e.g. resuming a large account, where targets +/// are generated for every transfer — that churns through sockets/file +/// descriptors and requests start hanging. Reuse one pooled client instead. +fn http_client() -> &'static reqwest::Client { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT.get_or_init(|| { + reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .build() + .expect("building shared put.io client") + }) +} #[derive(Debug, Clone, Deserialize)] pub struct PutIOTransfer { @@ -30,10 +45,11 @@ pub struct AccountInfoResponse { } pub async fn account_info(api_token: &str) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get("https://api.put.io/v2/account/info") .header("authorization", format!("Bearer {}", api_token)) + .timeout(Duration::from_secs(30)) .send() .await?; @@ -56,7 +72,7 @@ pub struct GetTransferResponse { /// Returns the user's transfers. pub async fn list_transfers(api_token: &str) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get("https://api.put.io/v2/transfers/list") .timeout(Duration::from_secs(30)) @@ -72,7 +88,7 @@ pub async fn list_transfers(api_token: &str) -> Result { } pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/transfers/{}", transfer_id)) .timeout(Duration::from_secs(10)) @@ -92,7 +108,7 @@ pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("transfer_ids", transfer_id.to_string()); let response = client .post("https://api.put.io/v2/transfers/remove") @@ -114,7 +130,7 @@ pub async fn remove_transfer(api_token: &str, transfer_id: u64) -> Result<()> { } pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("file_ids", file_id.to_string()); let response = client .post("https://api.put.io/v2/files/delete") @@ -136,7 +152,7 @@ pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> { } pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("url", url.to_string()); let response = client .post("https://api.put.io/v2/transfers/add") @@ -154,7 +170,7 @@ pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> { } pub async fn upload_file(api_token: &str, bytes: &[u8]) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let file_part = multipart::Part::bytes(bytes.to_owned()).file_name("foo.torrent"); let form = reqwest::multipart::Form::new() @@ -190,13 +206,14 @@ pub struct FileResponse { } pub async fn list_files(api_token: &str, file_id: i64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!( "https://api.put.io/v2/files/list?parent_id={}", file_id )) .header("authorization", format!("Bearer {}", api_token)) + .timeout(Duration::from_secs(30)) .send() .await?; @@ -217,10 +234,11 @@ pub struct URLResponse { } pub async fn url(api_token: &str, file_id: i64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/files/{}/url", file_id)) .header("authorization", format!("Bearer {}", api_token)) + .timeout(Duration::from_secs(30)) .send() .await?; @@ -284,7 +302,7 @@ pub async fn get_config_value( value: Option, } - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/config/{}", key)) .timeout(Duration::from_secs(10)) @@ -305,7 +323,7 @@ pub async fn get_config_value( /// Stores a value in put.io's per-user, per-app key-value config store. pub async fn set_config_value(api_token: &str, key: &str, value: &T) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let body = serde_json::json!({ "value": value }); let response = client .put(format!("https://api.put.io/v2/config/{}", key))