Skip to content

fix: use slot instead of trigger for async recv#172

Open
acking-you wants to merge 1 commit into
zesterer:masterfrom
acking-you:fix/async-rendezvous-channel
Open

fix: use slot instead of trigger for async recv#172
acking-you wants to merge 1 commit into
zesterer:masterfrom
acking-you:fix/async-rendezvous-channel

Conversation

@acking-you
Copy link
Copy Markdown

Fix async rendezvous channel deadlock (Issue #140)

Problem

Async receivers use Hook::trigger which has no slot to store messages. When a sender finds a waiting async receiver on a rendezvous channel (bounded(0)), it:

  1. Calls fire_send() which returns (Some(msg), signal) because trigger has no slot
  2. Puts the message into chan.queue
  3. Fires the signal to wake the receiver
  4. Returns Ok(()) immediately

This violates rendezvous semantics where the sender should block until the receiver actually takes the message. The premature return causes race conditions and deadlocks in high-throughput scenarios.

Root Cause

In src/async.rs, RecvFut::poll_inner creates a trigger hook:

|| Hook::trigger(AsyncSignal::new(cx, stream))

Hook::trigger has no slot (self.0 = None), so when fire_send is called:

pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
    let ret = match self.lock() {
        Some(mut lock) => { *lock = Some(msg); None }
        None => Some(msg),  // trigger returns Some(msg)
    };
    (ret, self.signal())
}

The message is returned back and placed in the queue instead of being delivered directly to the receiver.

Fix

Change async receivers to use Hook::slot instead of Hook::trigger:

// Before
|| Hook::trigger(AsyncSignal::new(cx, stream))

// After
|| Hook::slot(None, AsyncSignal::new(cx, stream))

And check the slot first when polling:

if let Some(hook) = self.hook.as_ref() {
    // Check if message was delivered directly to our slot
    if let Some(msg) = hook.try_take() {
        return Poll::Ready(Ok(msg));
    }
    // ... rest of the logic
}

Reproduction

use flume::bounded;
use std::time::{Duration, Instant};

const MESSAGES: usize = 1_000_000;
const THREADS: usize = 4;

#[tokio::main]
async fn main() {
    // Run MPMC test repeatedly (simulating benchmark)
    loop {
        let (tx, rx) = bounded::<usize>(0);
        let mut handles = Vec::new();

        for _ in 0..THREADS {
            let tx = tx.clone();
            handles.push(tokio::spawn(async move {
                for i in 1..MESSAGES / THREADS + 1 {
                    tx.send_async(i).await.unwrap();
                }
            }));
        }

        for _ in 0..THREADS {
            let rx = rx.clone();
            handles.push(tokio::spawn(async move {
                for _ in 0..MESSAGES / THREADS {
                    rx.recv_async().await.unwrap();
                }
            }));
        }

        for h in handles {
            h.await.unwrap();
        }
        // Deadlocks after 1-2 iterations without the fix
    }
}

Testing

  • All existing tests pass
  • The reproduction case completes successfully with the fix
  • Benchmark flume-async runs to completion

Notes

Use Hook::slot for async receivers to properly support
rendezvous channels (bounded(0)). This fixes deadlock
when sending non-ZST types. Closes zesterer#140.
@zesterer
Copy link
Copy Markdown
Owner

This looks suspiciously like it was generated by an LLM. Before I spend time reviewing this, could you confirm that you have personally checked its reasoning thoroughly by hand? I would much rather a human-authored description.

@acking-you
Copy link
Copy Markdown
Author

This looks suspiciously like it was generated by an LLM. Before I spend time reviewing this, could you confirm that you have personally checked its reasoning thoroughly by hand? I would much rather a human-authored description.

Sorry, I did indeed make use of Claude. The situation I encountered is exactly as shown in the example I described. I actually discovered this issue while testing performance: flume can deadlock, but this only happens in the latest version; the issue does not occur in 0.11.0. I did a preliminary review and confirmed the change, but my understanding of flume may be incomplete. It might still be necessary to trouble you to review the change from a broader, system-wide perspective to see whether it is correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants