Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 44 additions & 31 deletions runtime/internal/runtime/z_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ type Chan struct {
cond sync.Cond
data unsafe.Pointer
getp int
len int
cap int
sop *selectOp
sops []*selectOp
// seq changes when an unbuffered send/recv rendezvous completes. A later
// recv on the same channel can set getp back to chanHasRecv before the
// previous recv wakes up, so recv waiters must not use getp alone to decide
// whether their own rendezvous completed.
seq uint64
len int
cap int
sop *selectOp
sops []*selectOp
// sends counts goroutines blocked in unbuffered send, including select-send.
sends uint16
// selsends is the subset of sends originating from select operations.
Expand Down Expand Up @@ -139,6 +144,7 @@ func ChanTrySend(p *Chan, v unsafe.Pointer, eltSize int) bool {
c.Memcpy(p.data, v, uintptr(eltSize))
}
p.getp = chanNoSendRecv
p.seq++
} else {
if p.len == n {
p.mutex.Unlock()
Expand Down Expand Up @@ -180,6 +186,7 @@ func ChanSend(p *Chan, v unsafe.Pointer, eltSize int) bool {
c.Memcpy(p.data, v, uintptr(eltSize))
}
p.getp = chanNoSendRecv
p.seq++
} else {
for p.len == n && !p.close {
p.cond.Wait(&p.mutex)
Expand Down Expand Up @@ -223,6 +230,22 @@ func chanTryRecv(p *Chan, v unsafe.Pointer, eltSize int, acceptSelectSend bool)
}
p.getp = chanHasRecv
p.data = v
seq := p.seq
notifyOps(p)
p.mutex.Unlock()
p.cond.Broadcast()

p.mutex.Lock()
for p.getp == chanHasRecv && p.seq == seq && !p.close {
p.cond.Wait(&p.mutex)
}
recvOK = p.seq != seq || p.getp != chanHasRecv
if !recvOK {
zeroChanRecv(v, eltSize)
}
tryOK = true
p.mutex.Unlock()
return
} else {
if p.len == 0 {
tryOK = p.close
Expand All @@ -241,20 +264,7 @@ func chanTryRecv(p *Chan, v unsafe.Pointer, eltSize int, acceptSelectSend bool)
notifyOps(p)
p.mutex.Unlock()
p.cond.Broadcast()
if n == 0 {
p.mutex.Lock()
for p.getp == chanHasRecv && !p.close {
p.cond.Wait(&p.mutex)
}
recvOK = p.getp != chanHasRecv
if !recvOK {
zeroChanRecv(v, eltSize)
}
tryOK = true
p.mutex.Unlock()
} else {
recvOK, tryOK = true, true
}
recvOK, tryOK = true, true
return
}

Expand All @@ -276,6 +286,21 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) {
}
p.getp = chanHasRecv
p.data = v
seq := p.seq
notifyOps(p)
p.mutex.Unlock()
p.cond.Broadcast()

p.mutex.Lock()
for p.getp == chanHasRecv && p.seq == seq && !p.close {
p.cond.Wait(&p.mutex)
}
recvOK = p.seq != seq || p.getp != chanHasRecv
if !recvOK {
zeroChanRecv(v, eltSize)
}
p.mutex.Unlock()
return
} else {
for p.len == 0 {
if p.close {
Expand All @@ -294,19 +319,7 @@ func ChanRecv(p *Chan, v unsafe.Pointer, eltSize int) (recvOK bool) {
notifyOps(p)
p.mutex.Unlock()
p.cond.Broadcast()
if n == 0 {
p.mutex.Lock()
for p.getp == chanHasRecv && !p.close {
p.cond.Wait(&p.mutex)
}
recvOK = p.getp != chanHasRecv
if !recvOK {
zeroChanRecv(v, eltSize)
}
p.mutex.Unlock()
} else {
recvOK = true
}
recvOK = true
return
}

Expand Down
39 changes: 39 additions & 0 deletions test/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,42 @@ func TestSelectMixedUnbufferedPeersMakeProgress(t *testing.T) {
}
}
}

func TestSelectRecvCompletionNotOverwrittenByNextRecv(t *testing.T) {
for i := 0; i < 1000; i++ {
ch := make(chan struct{})
done := make(chan struct{})
recvDone := make(chan struct{})

go func() {
defer close(done)
defer close(ch)
for {
select {
case <-ch:
return
default:
}
}
}()

ch <- struct{}{}

go func() {
<-ch
close(recvDone)
}()

select {
case <-recvDone:
case <-time.After(time.Second):
t.Fatalf("iteration %d: receive completion was overwritten by a later receive", i)
}

select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("iteration %d: select receiver did not exit", i)
}
}
}
Loading