Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/download_system/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ use file_owner::PathExt;
use futures::StreamExt;
use log::{error, info, warn};
use nix::unistd::Uid;
use std::time::Duration;
use std::{fs, path::Path};

/// How long to wait for a download request to start returning a response (the
/// connect + response-headers phase). Bounds it so a server that accepts the
/// connection but then stalls can't hang the worker; the retry loop resumes.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);

/// How long to wait for the next chunk of a download stream before treating it
/// as stalled (and erroring so the retry loop resumes).
const STREAM_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct Worker {
_id: usize,
Expand Down Expand Up @@ -129,7 +139,20 @@ async fn fetch_attempt(
if existing > 0 {
req = req.header(reqwest::header::RANGE, format!("bytes={}-", existing));
}
let response = req.send().await?;
// Bound the request itself, not just the connect. put.io can accept the
// connection and then stall before sending response headers; without this
// timeout `send()` blocks forever, parking the download worker (and, via the
// blocked done channel, every orchestration worker) until the whole process
// stops pulling. On timeout we error so the retry loop can resume (issue #32).
Comment thread
bugrax marked this conversation as resolved.
let response =
match tokio::time::timeout(REQUEST_TIMEOUT, req.send()).await {
Ok(r) => r?,
Err(_) => bail!(
"timed out after {:?} waiting for response headers from {}",
REQUEST_TIMEOUT,
target.from.as_deref().unwrap_or("<no url>")
),
};
let status = response.status();
if !(status.is_success() || status == reqwest::StatusCode::PARTIAL_CONTENT) {
bail!("HTTP {}", status);
Expand All @@ -146,12 +169,12 @@ async fn fetch_attempt(

let mut byte_stream = response.bytes_stream();
loop {
match tokio::time::timeout(std::time::Duration::from_secs(60), byte_stream.next()).await {
match tokio::time::timeout(STREAM_IDLE_TIMEOUT, byte_stream.next()).await {
Ok(Some(item)) => {
tokio::io::copy(&mut item?.as_ref(), &mut tmp_file).await?;
}
Ok(None) => break,
Err(_) => bail!("stalled: no data received for 60s"),
Err(_) => bail!("stalled: no data received for {:?}", STREAM_IDLE_TIMEOUT),
}
}
Ok(())
Expand Down
42 changes: 30 additions & 12 deletions src/services/putio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
use anyhow::{bail, Result};
use reqwest::multipart;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, sync::OnceLock, time::Duration};

/// Shared, connection-pooled HTTP client for all put.io API calls. Building a
/// fresh `reqwest::Client` per request (as the code did before) means no
/// connection reuse; under load — e.g. resuming a large account, where targets
/// are generated for every transfer — that churns through sockets/file
/// descriptors and requests start hanging. Reuse one pooled client instead.
fn http_client() -> &'static reqwest::Client {
static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
CLIENT.get_or_init(|| {
reqwest::Client::builder()
.connect_timeout(Duration::from_secs(30))
.build()
.expect("building shared put.io client")
})
}

#[derive(Debug, Clone, Deserialize)]
pub struct PutIOTransfer {
Expand Down Expand Up @@ -30,10 +45,11 @@ pub struct AccountInfoResponse {
}

pub async fn account_info(api_token: &str) -> Result<AccountInfoResponse> {
let client = reqwest::Client::new();
let client = http_client();
let response = client
.get("https://api.put.io/v2/account/info")
.header("authorization", format!("Bearer {}", api_token))
.timeout(Duration::from_secs(30))
.send()
.await?;

Expand All @@ -56,7 +72,7 @@ pub struct GetTransferResponse {

/// Returns the user's transfers.
pub async fn list_transfers(api_token: &str) -> Result<ListTransferResponse> {
let client = reqwest::Client::new();
let client = http_client();
let response = client
.get("https://api.put.io/v2/transfers/list")
.timeout(Duration::from_secs(30))
Expand All @@ -72,7 +88,7 @@ pub async fn list_transfers(api_token: &str) -> Result<ListTransferResponse> {
}

pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result<GetTransferResponse> {
let client = reqwest::Client::new();
let client = http_client();
let response = client
.get(format!("https://api.put.io/v2/transfers/{}", transfer_id))
.timeout(Duration::from_secs(10))
Expand All @@ -92,7 +108,7 @@ pub async fn get_transfer(api_token: &str, transfer_id: u64) -> Result<GetTransf
}

pub async fn remove_transfer(api_token: &str, transfer_id: u64) -> Result<()> {
let client = reqwest::Client::new();
let client = http_client();
let form = multipart::Form::new().text("transfer_ids", transfer_id.to_string());
let response = client
.post("https://api.put.io/v2/transfers/remove")
Expand All @@ -114,7 +130,7 @@ pub async fn remove_transfer(api_token: &str, transfer_id: u64) -> Result<()> {
}

pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> {
let client = reqwest::Client::new();
let client = http_client();
let form = multipart::Form::new().text("file_ids", file_id.to_string());
let response = client
.post("https://api.put.io/v2/files/delete")
Expand All @@ -136,7 +152,7 @@ pub async fn delete_file(api_token: &str, file_id: i64) -> Result<()> {
}

pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> {
let client = reqwest::Client::new();
let client = http_client();
let form = multipart::Form::new().text("url", url.to_string());
let response = client
.post("https://api.put.io/v2/transfers/add")
Expand All @@ -154,7 +170,7 @@ pub async fn add_transfer(api_token: &str, url: &str) -> Result<()> {
}

pub async fn upload_file(api_token: &str, bytes: &[u8]) -> Result<()> {
let client = reqwest::Client::new();
let client = http_client();
let file_part = multipart::Part::bytes(bytes.to_owned()).file_name("foo.torrent");

let form = reqwest::multipart::Form::new()
Expand Down Expand Up @@ -190,13 +206,14 @@ pub struct FileResponse {
}

pub async fn list_files(api_token: &str, file_id: i64) -> Result<ListFileResponse> {
let client = reqwest::Client::new();
let client = http_client();
let response = client
.get(format!(
"https://api.put.io/v2/files/list?parent_id={}",
file_id
))
.header("authorization", format!("Bearer {}", api_token))
.timeout(Duration::from_secs(30))
.send()
.await?;

Expand All @@ -217,10 +234,11 @@ pub struct URLResponse {
}

pub async fn url(api_token: &str, file_id: i64) -> Result<String> {
let client = reqwest::Client::new();
let client = http_client();
let response = client
.get(format!("https://api.put.io/v2/files/{}/url", file_id))
.header("authorization", format!("Bearer {}", api_token))
.timeout(Duration::from_secs(30))
.send()
.await?;

Expand Down Expand Up @@ -284,7 +302,7 @@ pub async fn get_config_value<T: DeserializeOwned>(
value: Option<T>,
}

let client = reqwest::Client::new();
let client = http_client();
let response = client
.get(format!("https://api.put.io/v2/config/{}", key))
.timeout(Duration::from_secs(10))
Expand All @@ -305,7 +323,7 @@ pub async fn get_config_value<T: DeserializeOwned>(

/// Stores a value in put.io's per-user, per-app key-value config store.
pub async fn set_config_value<T: Serialize>(api_token: &str, key: &str, value: &T) -> Result<()> {
let client = reqwest::Client::new();
let client = http_client();
let body = serde_json::json!({ "value": value });
let response = client
.put(format!("https://api.put.io/v2/config/{}", key))
Expand Down