Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions src/platform/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,9 @@ impl MessageReader {
/// (Or `fetch_iocp_result()` for readers in a set.)
///
/// The only exception is if the kernel indicates
/// that no operation was actually outstanding at this point.
/// In that case, the `async` data is released immediately;
/// that no operation was actually outstanding at this point
/// (i.e. the read already completed before the cancel request).
/// In that case, the completed read is processed immediately;
/// and the caller should not attempt waiting for completion.
fn issue_async_cancel(&mut self) {
unsafe {
Expand All @@ -664,25 +665,18 @@ impl MessageReader {
);

if let Err(error) = result {
// A cancel operation is not expected to fail.
// If it does, callers are not prepared for that -- so we have to bail.
//
// Note that we should never ignore a failed cancel,
// since that would affect further operations;
// and the caller definitely must not free the aliased data in that case!
//
// Sometimes `CancelIoEx()` fails with `ERROR_NOT_FOUND` though,
// meaning there is actually no async operation outstanding at this point.
// (Specifically, this is triggered by the `receiver_set_big_data()` test.)
// Not sure why that happens -- but I *think* it should be benign...
//
// In that case, we can safely free the async data right now;
// and the caller should not attempt to wait for completion.
// `CancelIoEx()` can fail with `ERROR_NOT_FOUND` when the
// async read has already completed by the time we try to
// cancel it. This happens when data arrives between the
// timeout expiring and the cancel call. Any other error is
// unexpected — callers are not prepared for it, so bail.
assert!(error.code() == ERROR_NOT_FOUND.to_hresult());

let async_data = self.r#async.take().unwrap().into_inner();
self.handle = async_data.handle;
self.read_buf = async_data.buf;
// The operation completed before we could cancel it.
// Process it as a normal completion so that the bytes the
// kernel read are accounted for in `read_buf`.
self.notify_completion(Ok(()))
.expect("completed read should not fail");
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,60 @@ fn try_recv_timeout() {
}
}

// Regression test for: try_recv_timeout silently loses messages on Windows
//
// When a message arrives in the narrow window between GetOverlappedResultEx
// returning WAIT_TIMEOUT and CancelIoEx running in issue_async_cancel, the
// kernel completes the read but CancelIoEx returns ERROR_NOT_FOUND. The
// current ERROR_NOT_FOUND handler restores the buffer without updating its
// length, so the bytes the kernel read are silently discarded.
//
// This test sends a message timed to arrive right at the timeout boundary,
// then verifies it is not lost. The race window is small, so the test is
// repeated to increase the chance of hitting it.
#[cfg_attr(not(feature = "enable-slow-tests"), ignore)]
#[test]
fn try_recv_timeout_message_not_lost_at_boundary() {
// Use a short timeout so iterations are fast.
let timeout = Duration::from_millis(50);

// Repeat many times to exercise the race window.
for _ in 0..200 {
let (tx, rx) = ipc::channel::<i32>().unwrap();

// Send from another thread after sleeping for approximately the
// timeout duration, maximising the chance of hitting the window
// between WAIT_TIMEOUT and CancelIoEx.
let send_delay = timeout;
thread::spawn(move || {
thread::sleep(send_delay);
tx.send(42).unwrap();
});

// The first try_recv_timeout may legitimately return Empty if the
// message has not been written yet. But the message must not be
// lost — a subsequent receive with a generous timeout must find it.
match rx.try_recv_timeout(timeout) {
Ok(val) => {
assert_eq!(val, 42);
continue; // Got it on the first try — no race this iteration.
},
Err(crate::TryRecvError::Empty) => {
// Timed out. The message should still be available.
},
Err(e) => panic!("Unexpected error: {e:?}"),
}

// The message was sent (the sender thread has run) but the first
// try_recv_timeout returned Empty. The message must still be
// retrievable — if it was silently lost, this will hang or fail.
let val = rx
.try_recv_timeout(Duration::from_secs(5))
.expect("message lost: first try_recv_timeout returned Empty but message is no longer in the pipe");
assert_eq!(val, 42);
}
}

#[test]
fn multiple_paths_to_a_sender() {
let person = ("Patrick Walton".to_owned(), 29);
Expand Down
Loading