From 5ab5b1f9c6291aa84fa63e62a6bf032de59853e2 Mon Sep 17 00:00:00 2001 From: ZhouGuangyuan Date: Tue, 30 Jun 2026 23:49:34 +0800 Subject: [PATCH] runtime: avoid stale unbuffered receive waits --- runtime/internal/runtime/z_chan.go | 75 ++++++++++++++++++------------ test/select_test.go | 39 ++++++++++++++++ 2 files changed, 83 insertions(+), 31 deletions(-) diff --git a/runtime/internal/runtime/z_chan.go b/runtime/internal/runtime/z_chan.go index b1367091dc..7863317e1a 100644 --- a/runtime/internal/runtime/z_chan.go +++ b/runtime/internal/runtime/z_chan.go @@ -36,10 +36,15 @@ type Chan struct { cond sync.Cond data unsafe.Pointer getp int - len int - cap int - sop *selectOp - sops []*selectOp + // seq changes when an unbuffered send/recv rendezvous completes. A later + // recv on the same channel can set getp back to chanHasRecv before the + // previous recv wakes up, so recv waiters must not use getp alone to decide + // whether their own rendezvous completed. + seq uint64 + len int + cap int + sop *selectOp + sops []*selectOp // sends counts goroutines blocked in unbuffered send, including select-send. sends uint16 // selsends is the subset of sends originating from select operations. @@ -139,6 +144,7 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool { c.Memcpy(p.data, v, uintptr(eltSize)) } p.getp = chanNoSendRecv + p.seq++ } else { if p.len == n { p.mutex.Unlock() @@ -180,6 +186,7 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool { c.Memcpy(p.data, v, uintptr(eltSize)) } p.getp = chanNoSendRecv + p.seq++ } else { for p.len == n && !p.close { p.cond.Wait(&p.mutex) @@ -223,6 +230,22 @@ func chanTryRecv(p *Chan, v unsafe.Pointer, eltSize int, acceptSelectSend bool) } p.getp = chanHasRecv p.data = v + seq := p.seq + notifyOps(p) + p.mutex.Unlock() + p.cond.Broadcast() + + p.mutex.Lock() + for p.getp == chanHasRecv && p.seq == seq && !p.close { + p.cond.Wait(&p.mutex) + } + recvOK = p.seq != seq || p.getp != chanHasRecv + if !recvOK { + zeroChanRecv(v, eltSize) + } + tryOK = true + p.mutex.Unlock() + return } else { if p.len == 0 { tryOK = p.close @@ -241,20 +264,7 @@ func chanTryRecv(p *Chan, v unsafe.Pointer, eltSize int, acceptSelectSend bool) notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() - if n == 0 { - p.mutex.Lock() - for p.getp == chanHasRecv && !p.close { - p.cond.Wait(&p.mutex) - } - recvOK = p.getp != chanHasRecv - if !recvOK { - zeroChanRecv(v, eltSize) - } - tryOK = true - p.mutex.Unlock() - } else { - recvOK, tryOK = true, true - } + recvOK, tryOK = true, true return } @@ -276,6 +286,21 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) { } p.getp = chanHasRecv p.data = v + seq := p.seq + notifyOps(p) + p.mutex.Unlock() + p.cond.Broadcast() + + p.mutex.Lock() + for p.getp == chanHasRecv && p.seq == seq && !p.close { + p.cond.Wait(&p.mutex) + } + recvOK = p.seq != seq || p.getp != chanHasRecv + if !recvOK { + zeroChanRecv(v, eltSize) + } + p.mutex.Unlock() + return } else { for p.len == 0 { if p.close { @@ -294,19 +319,7 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) { notifyOps(p) p.mutex.Unlock() p.cond.Broadcast() - if n == 0 { - p.mutex.Lock() - for p.getp == chanHasRecv && !p.close { - p.cond.Wait(&p.mutex) - } - recvOK = p.getp != chanHasRecv - if !recvOK { - zeroChanRecv(v, eltSize) - } - p.mutex.Unlock() - } else { - recvOK = true - } + recvOK = true return } diff --git a/test/select_test.go b/test/select_test.go index 2af686a204..d58e1468e7 100644 --- a/test/select_test.go +++ b/test/select_test.go @@ -83,3 +83,42 @@ func TestSelectMixedUnbufferedPeersMakeProgress(t *testing.T) { } } } + +func TestSelectRecvCompletionNotOverwrittenByNextRecv(t *testing.T) { + for i := 0; i < 1000; i++ { + ch := make(chan struct{}) + done := make(chan struct{}) + recvDone := make(chan struct{}) + + go func() { + defer close(done) + defer close(ch) + for { + select { + case <-ch: + return + default: + } + } + }() + + ch <- struct{}{} + + go func() { + <-ch + close(recvDone) + }() + + select { + case <-recvDone: + case <-time.After(time.Second): + t.Fatalf("iteration %d: receive completion was overwritten by a later receive", i) + } + + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("iteration %d: select receiver did not exit", i) + } + } +}