From 6cacee895a96c33c2aba4540ee1e2cf9ed197efc Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 10:51:15 +0300 Subject: [PATCH 1/4] Time out the download request, not just the connect (fixes #32) fetch only set a connect_timeout and a per-chunk timeout around the byte stream. The req.send() call (connect + waiting for response headers) had no timeout of its own, so if put.io accepted the connection but stalled before sending headers, send() blocked forever. The download worker parked there, the orchestration worker blocked on its done channel, and once the few download workers were stuck the whole process stopped pulling (a restart only cleared it temporarily). Wrap req.send() in a 60s timeout; a stalled request now errors and the existing retry loop resumes it instead of hanging. --- src/download_system/download.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/download_system/download.rs b/src/download_system/download.rs index dda5736..ad3674d 100644 --- a/src/download_system/download.rs +++ b/src/download_system/download.rs @@ -129,7 +129,16 @@ async fn fetch_attempt( if existing > 0 { req = req.header(reqwest::header::RANGE, format!("bytes={}-", existing)); } - let response = req.send().await?; + // Bound the request itself, not just the connect. put.io can accept the + // connection and then stall before sending response headers; without this + // timeout `send()` blocks forever, parking the download worker (and, via the + // blocked done channel, every orchestration worker) until the whole process + // stops pulling. On timeout we error so the retry loop can resume (issue #32). + let response = + match tokio::time::timeout(std::time::Duration::from_secs(60), req.send()).await { + Ok(r) => r?, + Err(_) => bail!("timed out waiting for response headers from put.io"), + }; let status = response.status(); if !(status.is_success() || status == reqwest::StatusCode::PARTIAL_CONTENT) { bail!("HTTP {}", status); From 13e6886a2830bd25ecdd282aacf79347d90a03de Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 11:02:47 +0300 Subject: [PATCH 2/4] Add request timeouts to put.io list_files/url/account_info MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These three put.io API calls were missing the .timeout() the other put.io calls already have. list_files and url are called from get_download_targets for every transfer; when put.io accepted the connection but stalled, they hung with no timeout, freezing the orchestration worker before any download even started — the real cause of the recurring download stall (the download send() timeout in the previous commit covers a later point in the same path). --- src/services/putio.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/services/putio.rs b/src/services/putio.rs index 91c1000..2866523 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -34,6 +34,7 @@ pub async fn account_info(api_token: &str) -> Result { let response = client .get("https://api.put.io/v2/account/info") .header("authorization", format!("Bearer {}", api_token)) + .timeout(Duration::from_secs(30)) .send() .await?; @@ -197,6 +198,7 @@ pub async fn list_files(api_token: &str, file_id: i64) -> Result Result { let response = client .get(format!("https://api.put.io/v2/files/{}/url", file_id)) .header("authorization", format!("Bearer {}", api_token)) + .timeout(Duration::from_secs(30)) .send() .await?; From a15261f3392b8eaed6b43f455476dde688d94064 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 11:17:56 +0300 Subject: [PATCH 3/4] Reuse one pooled HTTP client for put.io API calls Every put.io call built a fresh reqwest::Client, so connections were never reused. Resuming a large account generates targets for every transfer (list_files + url each), and the resulting client/socket churn exhausts file descriptors until requests hang with no progress and no error. Share a single connection-pooled client via OnceLock instead. --- src/services/putio.rs | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/src/services/putio.rs b/src/services/putio.rs index 2866523..3635d47 100644 --- a/src/services/putio.rs +++ b/src/services/putio.rs @@ -1,7 +1,22 @@ use anyhow::{bail, Result}; use reqwest::multipart; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::OnceLock, time::Duration}; + +/// Shared, connection-pooled HTTP client for all put.io API calls. Building a +/// fresh `reqwest::Client` per request (as the code did before) means no +/// connection reuse; under load — e.g. resuming a large account, where targets +/// are generated for every transfer — that churns through sockets/file +/// descriptors and requests start hanging. Reuse one pooled client instead. +fn http_client() -> &'static reqwest::Client { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT.get_or_init(|| { + reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .build() + .expect("building shared put.io client") + }) +} #[derive(Debug, Clone, Deserialize)] pub struct PutIOTransfer { @@ -30,7 +45,7 @@ pub struct AccountInfoResponse { } pub async fn account_info(api_token: &str) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get("https://api.put.io/v2/account/info") .header("authorization", format!("Bearer {}", api_token)) @@ -57,7 +72,7 @@ pub struct GetTransferResponse { /// Returns the user's transfers. pub async fn list_transfers(api_token: &str) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get("https://api.put.io/v2/transfers/list") .timeout(Duration::from_secs(30)) @@ -73,7 +88,7 @@ pub async fn list_transfers(api_token: &str) -> Result { } pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/transfers/{}", transfer_id)) .timeout(Duration::from_secs(10)) @@ -93,7 +108,7 @@ pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("transfer_ids", transfer_id.to_string()); let response = client .post("https://api.put.io/v2/transfers/remove") @@ -115,7 +130,7 @@ pub async fn remove_transfer(api_token: &str, transfer_id: u64) -> Result<()> { } pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("file_ids", file_id.to_string()); let response = client .post("https://api.put.io/v2/files/delete") @@ -137,7 +152,7 @@ pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> { } pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let form = multipart::Form::new().text("url", url.to_string()); let response = client .post("https://api.put.io/v2/transfers/add") @@ -155,7 +170,7 @@ pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> { } pub async fn upload_file(api_token: &str, bytes: &[u8]) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let file_part = multipart::Part::bytes(bytes.to_owned()).file_name("foo.torrent"); let form = reqwest::multipart::Form::new() @@ -191,7 +206,7 @@ pub struct FileResponse { } pub async fn list_files(api_token: &str, file_id: i64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!( "https://api.put.io/v2/files/list?parent_id={}", @@ -219,7 +234,7 @@ pub struct URLResponse { } pub async fn url(api_token: &str, file_id: i64) -> Result { - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/files/{}/url", file_id)) .header("authorization", format!("Bearer {}", api_token)) @@ -287,7 +302,7 @@ pub async fn get_config_value( value: Option, } - let client = reqwest::Client::new(); + let client = http_client(); let response = client .get(format!("https://api.put.io/v2/config/{}", key)) .timeout(Duration::from_secs(10)) @@ -308,7 +323,7 @@ pub async fn get_config_value( /// Stores a value in put.io's per-user, per-app key-value config store. pub async fn set_config_value(api_token: &str, key: &str, value: &T) -> Result<()> { - let client = reqwest::Client::new(); + let client = http_client(); let body = serde_json::json!({ "value": value }); let response = client .put(format!("https://api.put.io/v2/config/{}", key)) From 2137007f5cd0694bad1d0f02cf5649d6b9fa8698 Mon Sep 17 00:00:00 2001 From: bugrax Date: Wed, 10 Jun 2026 13:16:17 +0300 Subject: [PATCH 4/4] Address review: name the download timeouts and make messages generic Extract the request and stream-idle timeouts into REQUEST_TIMEOUT and STREAM_IDLE_TIMEOUT constants, and include the duration (and target url) in the timeout messages instead of a hard-coded host/duration. --- src/download_system/download.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/download_system/download.rs b/src/download_system/download.rs index ad3674d..0849bd1 100644 --- a/src/download_system/download.rs +++ b/src/download_system/download.rs @@ -8,8 +8,18 @@ use file_owner::PathExt; use futures::StreamExt; use log::{error, info, warn}; use nix::unistd::Uid; +use std::time::Duration; use std::{fs, path::Path}; +/// How long to wait for a download request to start returning a response (the +/// connect + response-headers phase). Bounds it so a server that accepts the +/// connection but then stalls can't hang the worker; the retry loop resumes. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(60); + +/// How long to wait for the next chunk of a download stream before treating it +/// as stalled (and erroring so the retry loop resumes). +const STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(60); + #[derive(Clone)] pub struct Worker { _id: usize, @@ -135,9 +145,13 @@ async fn fetch_attempt( // blocked done channel, every orchestration worker) until the whole process // stops pulling. On timeout we error so the retry loop can resume (issue #32). let response = - match tokio::time::timeout(std::time::Duration::from_secs(60), req.send()).await { + match tokio::time::timeout(REQUEST_TIMEOUT, req.send()).await { Ok(r) => r?, - Err(_) => bail!("timed out waiting for response headers from put.io"), + Err(_) => bail!( + "timed out after {:?} waiting for response headers from {}", + REQUEST_TIMEOUT, + target.from.as_deref().unwrap_or("") + ), }; let status = response.status(); if !(status.is_success() || status == reqwest::StatusCode::PARTIAL_CONTENT) { @@ -155,12 +169,12 @@ async fn fetch_attempt( let mut byte_stream = response.bytes_stream(); loop { - match tokio::time::timeout(std::time::Duration::from_secs(60), byte_stream.next()).await { + match tokio::time::timeout(STREAM_IDLE_TIMEOUT, byte_stream.next()).await { Ok(Some(item)) => { tokio::io::copy(&mut item?.as_ref(), &mut tmp_file).await?; } Ok(None) => break, - Err(_) => bail!("stalled: no data received for 60s"), + Err(_) => bail!("stalled: no data received for {:?}", STREAM_IDLE_TIMEOUT), } } Ok(())