Skip to content
7 changes: 6 additions & 1 deletion src/download_system/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ impl Worker {
Ok(_) => DownloadDoneStatus::Success,
Err(_) => DownloadDoneStatus::Failed,
};
dtm.tx.send(done_status).await?;
// Reporting status can fail if the orchestration worker that queued
// this target is gone. Don't let that take this worker down too —
// log and keep serving other downloads (issue #34).
if let Err(e) = dtm.tx.send(done_status).await {
warn!("download worker: could not report status (receiver gone): {e}");
}
}
}
}
Expand Down
136 changes: 92 additions & 44 deletions src/download_system/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,48 +49,13 @@ impl Worker {
let app_data = self.app_data.clone();
match msg {
TransferMessage::QueuedForDownload(t) => {
info!("{}: download {}", t, "started".yellow());
let targets = t.get_download_targets().await?;
// Create a communications channel for the download worker to communicate status back.
let done_channels: &Vec<(
Sender<DownloadDoneStatus>,
Receiver<DownloadDoneStatus>,
)> = &targets.iter().map(|_| async_channel::unbounded()).collect();

for (i, target) in targets.iter().enumerate() {
let (done_tx, _) = done_channels[i].clone();
self.dtx
.send(DownloadTargetMessage {
download_target: target.clone(),
tx: done_tx,
})
.await?;
}

// Wait for all the workers having sent back their status.
let mut all_downloaded = vec![];
for (_, done_rx) in done_channels {
all_downloaded.push(done_rx.recv().await?);
}

// Check if all are success
if all_downloaded.iter().all(|d| match d {
DownloadDoneStatus::Success => true,
DownloadDoneStatus::Failed => false,
}) {
info!("{}: download {}", t, "done".blue());
// The files now exist locally, so it's safe to report
// this transfer as complete to the *arr (see issue #16).
self.app_data.state.mark_local_complete(t.transfer_id).await;
self.tx
.send(TransferMessage::Downloaded(Transfer {
targets: Some(targets),
..t
}))
.await?;
} else {
// TODO: figure out what to do here..
warn!("{}: not all targets downloaded", t)
// Handle each transfer in a way that can't take the worker
// down: a `?` here used to bubble up and end work(), which
// (silently) killed the worker and, via the dropped done
// channels, cascaded to the others until everything stalled
// (issue #34). Log and carry on instead.
if let Err(e) = self.handle_queued(t).await {
warn!("download orchestration error (worker continuing): {}", e);
}
}
TransferMessage::Downloaded(t) => {
Expand All @@ -103,6 +68,73 @@ impl Worker {
}
}
}

/// Downloads all of a transfer's targets and, on full success, marks it
/// complete and forwards it for import. Returns Err on any failure so the
/// caller can log it without ending the worker (see issue #34).
async fn handle_queued(&self, t: Transfer) -> Result<()> {
info!("{}: download {}", t, "started".yellow());
// Reuse targets computed when the transfer was discovered if present —
// watch-folder orphans precompute them with the correct base dir
// (get_download_targets_in), which a plain get_download_targets() here
// would not reproduce (no persisted state) and would mis-route. Only
// regenerate for normal transfers, which don't carry them.
let targets = match t.targets.clone() {
Some(ts) => ts,
None => t.get_download_targets().await?,
};
Comment thread
bugrax marked this conversation as resolved.
// No targets means nothing to download; don't let the `all(...)` check
Comment thread
bugrax marked this conversation as resolved.
// below pass vacuously and mark the transfer complete.
if targets.is_empty() {
warn!("{}: no downloadable targets, skipping", t);
return Ok(());
}
// A status channel per target for the download workers to report back.
let done_channels: Vec<(Sender<DownloadDoneStatus>, Receiver<DownloadDoneStatus>)> =
targets.iter().map(|_| async_channel::unbounded()).collect();

for (i, target) in targets.iter().enumerate() {
let done_tx = done_channels[i].0.clone();
self.dtx
.send(DownloadTargetMessage {
download_target: target.clone(),
tx: done_tx,
})
.await?;
}
Comment thread
bugrax marked this conversation as resolved.

// Wait for all the workers having sent back their status.
let mut all_downloaded = vec![];
for (_, done_rx) in &done_channels {
all_downloaded.push(done_rx.recv().await?);
}

if all_downloaded
.iter()
.all(|d| matches!(d, DownloadDoneStatus::Success))
{
info!("{}: download {}", t, "done".blue());
// The files now exist locally, so it's safe to report this transfer
// as complete to the *arr (see issue #16).
self.app_data.state.mark_local_complete(t.transfer_id).await;
Comment thread
bugrax marked this conversation as resolved.
self.tx
.send(TransferMessage::Downloaded(Transfer {
targets: Some(targets),
..t
}))
.await?;
} else {
warn!("{}: not all targets downloaded", t);
// Drop a failed orphan from tracking so a later watch-folder scan
// can retry it instead of it being suppressed forever (issue #34).
if t.is_orphan {
if let Some(file_id) = t.file_id {
self.app_data.state.remove_orphan(file_id).await;
}
}
}
Ok(())
}
}

async fn watch_for_import(
Expand All @@ -129,8 +161,24 @@ async fn watch_for_import(
panic!("{}: no idea how to handle", &top_level_target)
}
};
let m = transfer.clone();
tx.send(TransferMessage::Imported(m)).await?;
// An orphan has no put.io transfer to remove or seed, so finish it
// here directly instead of routing an Imported message through a
// worker (which may be busy downloading and never pick it up),
// deleting the now-imported file from put.io (issue #34).
if transfer.is_orphan {
if let Some(file_id) = transfer.file_id {
match putio::delete_file(&app_data.config.putio.api_key, file_id).await {
Ok(_) => info!("{}: deleted orphan from put.io", transfer),
Err(e) => {
warn!("{}: failed to delete orphan from put.io: {}", transfer, e)
}
}
app_data.state.remove_orphan(file_id).await;
}
} else {
let m = transfer.clone();
tx.send(TransferMessage::Imported(m)).await?;
}

break;
}
Expand Down
Loading