fix: tighten oneshot memory ordering#111
Conversation
There was a problem hiding this comment.
Pull request overview
Tightens the oneshot channel’s state-transition memory orderings to better synchronize message/waker visibility and final channel deallocation ownership, and adds regression tests covering concurrent send/drop vs try_recv/poll.
Changes:
- Adjusted atomic orderings and added dedicated acquire fences around MESSAGE-handling paths to explicitly synchronize message visibility and final deallocation handoff.
- Updated drop paths (Sender/Receiver/Recv) to better coordinate which side frees the channel under races.
- Added concurrent send/drop completion tests for both
try_recvandFuture::pollpaths.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
mea/src/oneshot/mod.rs |
Updates atomic orderings/fences and drop-path state transitions to strengthen synchronization and ownership handoff. |
mea/src/oneshot/tests.rs |
Adds multi-threaded regression tests for concurrent send/drop vs try_recv/poll, plus a small thread helper. |
Comments suppressed due to low confidence (3)
mea/src/oneshot/tests.rs:363
- This test’s receiver thread loops with
spin_loop()untilDisconnectedis observed; without any timeout/iteration cap, failures can turn into an infinite hang duringjoin(). Consider adding a timeout/budget and failing fast if the expected state isn’t reached.
let receiver_thread = spawn_named("receiver", move || {
loop {
match receiver.try_recv() {
Ok(value) => panic!("unexpected value: {value}"),
Err(TryRecvError::Empty) => spin_loop(),
Err(TryRecvError::Disconnected) => break,
}
}
mea/src/oneshot/tests.rs:389
- This test polls in a tight unbounded
spin_loop()until completion. If the wake/state transition breaks, the test will hang indefinitely atjoin(). Add a bounded timeout/iteration budget (and panic on timeout) to keep CI reliable.
loop {
match Pin::new(&mut receiver).poll(&mut context) {
Poll::Ready(Ok(999)) => break,
Poll::Ready(result) => panic!("unexpected result: {result:?}"),
Poll::Pending => spin_loop(),
}
}
mea/src/oneshot/tests.rs:415
- Similar to the other concurrent tests, this thread spins in an unbounded loop until it observes the disconnect. A regression could cause an infinite hang. Add a timeout/iteration cap and fail fast rather than spinning forever.
loop {
match Pin::new(&mut receiver).poll(&mut context) {
Poll::Ready(Err(oneshot::RecvError::Disconnected)) => break,
Poll::Ready(result) => panic!("unexpected result: {result:?}"),
Poll::Pending => spin_loop(),
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // If this receiver was previously polled, but was not polled to completion, then the | ||
| // channel is in the RECEIVING state and has a waker written. We must move away from the | ||
| // RECEIVING state before dropping the waker, otherwise we could race with the sender | ||
| // taking the same waker and deallocating the channel. | ||
| if channel.state.load(Ordering::Relaxed) == RECEIVING | ||
| && channel | ||
| .state | ||
| .compare_exchange(RECEIVING, EMPTY, Ordering::Relaxed, Ordering::Relaxed) | ||
| .is_ok() | ||
| { | ||
| // SAFETY: The RECEIVING state guarantees we have written a waker. | ||
| unsafe { channel.drop_waker() }; | ||
| } |
| let receiver_thread = spawn_named("receiver", move || { | ||
| loop { | ||
| match receiver.try_recv() { | ||
| Ok(999) => break, | ||
| Ok(value) => panic!("unexpected value: {value}"), | ||
| Err(TryRecvError::Empty) => spin_loop(), | ||
| Err(TryRecvError::Disconnected) => panic!("unexpected disconnect"), | ||
| } | ||
| } |
|
Memory ordering here is tough to think through and guarantee correctness, so my review may not be very comprehensive. First and foremost, following Copilot's review, Then, I found an issue that existed before this PR. I am concerned that the As Copilot noted, the new spin-loop tests should also be bounded with a timeout or iteration budget. |
| // ORDERING: Release is required so that in the states where the sender becomes responsible | ||
| // for deallocating the channel, they can synchronize with this final state write from us. | ||
| // Acquire is required by the branches below to synchronize with writes from the sender. | ||
| match channel.state.swap(DISCONNECTED, Ordering::AcqRel) { |
There was a problem hiding this comment.
I think this can free the channel too early. When this swap() sees AWAKING, it also writes DISCONNECTED, so the loop below may just read our own write back and stop waiting while the sender is still taking the waker or publishing the final state.
Summary
Verification