diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index b08461298..cbef16f32 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -404,8 +404,31 @@ impl Sender { /// ``` pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { match &self.flavor { - SenderFlavor::Array(chan) => chan.try_send(msg), - SenderFlavor::List(chan) => chan.try_send(msg), + SenderFlavor::Array(chan) => chan.try_send(msg, true), + SenderFlavor::List(chan) => chan.try_send(msg, true), + SenderFlavor::Zero(chan) => chan.try_send(msg), + } + } + + /// Attempts to send a message into the channel without blocking and notifying receiver. + /// + /// This method will either send a message into the channel immediately or return an error if + /// the channel is full or disconnected. The returned error contains the original message. + /// + /// Omitting notification makes the send operation complete faster. However, this means + /// receiver won't be notified about the existence of new message immediately. This means the + /// sole use of this call for a given channel could cause receivers to be blocked indefinitely. + /// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver + /// like mixed use of notifying send operations or receive operations with retry or + /// timeout/deadline. + /// + /// If called on a zero-capacity channel, this method will send the message only if there + /// happens to be a receive operation on the other side of the channel at the same time. This + /// means this is equivalent to the [try_send](Sender::try_send) operation. + pub fn try_send_unnotified(&self, msg: T) -> Result<(), TrySendError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.try_send(msg, false), + SenderFlavor::List(chan) => chan.try_send(msg, false), SenderFlavor::Zero(chan) => chan.try_send(msg), } } @@ -440,8 +463,54 @@ impl Sender { /// ``` pub fn send(&self, msg: T) -> Result<(), SendError> { match &self.flavor { - SenderFlavor::Array(chan) => chan.send(msg, None), - SenderFlavor::List(chan) => chan.send(msg, None), + SenderFlavor::Array(chan) => chan.send(msg, None, true), + SenderFlavor::List(chan) => chan.send(msg, None, true), + SenderFlavor::Zero(chan) => chan.send(msg, None), + } + .map_err(|err| match err { + SendTimeoutError::Disconnected(msg) => SendError(msg), + SendTimeoutError::Timeout(_) => unreachable!(), + }) + } + + /// Blocks the current thread until a message is sent without notifying receiver or the + /// channel is disconnected. + /// + /// Omitting notification makes the send operation complete faster. However, this means + /// receiver won't be notified about the existence of new message immediately. This means the + /// sole use of this call for a given channel could cause receivers to be blocked indefinitely. + /// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver + /// like mixed use of notifying send operations or receive operations with retry or + /// timeout/deadline. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed. If the channel becomes disconnected, this call will wake up and return an + /// error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. This means this is equivalent to the + /// [send](Sender::send) operation. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_channel::bounded; + /// + /// let (s, r) = bounded(1); + /// assert_eq!(s.send_unnotified(1), Ok(())); + /// + /// thread::spawn(move || { + /// // This sleep is crucial; otherwise `.recv()` could block indefinitely! + /// thread::sleep(Duration::from_secs(1)); + /// assert_eq!(r.recv(), Ok(1)); + /// }).join().unwrap(); + /// ``` + pub fn send_unnotified(&self, msg: T) -> Result<(), SendError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.send(msg, None, false), + SenderFlavor::List(chan) => chan.send(msg, None, false), SenderFlavor::Zero(chan) => chan.send(msg, None), } .map_err(|err| match err { @@ -489,7 +558,35 @@ impl Sender { /// ``` pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError> { match Instant::now().checked_add(timeout) { - Some(deadline) => self.send_deadline(msg, deadline), + Some(deadline) => self.send_internal(msg, deadline, true), + None => self.send(msg).map_err(SendTimeoutError::from), + } + } + + /// Waits for a message to be sent into the channel without notifying receiver, but only for a + /// limited time. + /// + /// Omitting notification makes the send operation complete faster. However, this means + /// receiver won't be notified about the existence of new message immediately. This means the + /// sole use of this call for a given channel could cause receivers to be blocked indefinitely. + /// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver + /// like mixed use of notifying send operations or receive operations with retry or + /// timeout/deadline. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. This means this is equivalent to the + /// [send_timeout](Sender::send_timeout) operation. + pub fn send_timeout_unnotified( + &self, + msg: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError> { + match Instant::now().checked_add(timeout) { + Some(deadline) => self.send_internal(msg, deadline, false), None => self.send(msg).map_err(SendTimeoutError::from), } } @@ -534,9 +631,42 @@ impl Sender { /// ); /// ``` pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError> { + self.send_internal(msg, deadline, true) + } + + /// Waits for a message to be sent into the channel without notifying receiver, but only until a given deadline. + /// + /// Omitting notification makes the send operation complete faster. However, this means + /// receiver won't be notified about the existence of new message immediately. This means the + /// sole use of this call for a given channel could cause receivers to be blocked indefinitely. + /// Thus, this call must be accompanied with some explicit mechanism to wake up the receiver + /// like mixed use of notifying send operations or receive operations with retry or + /// timeout/deadline. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. This means this is equivalent to the + /// [send_deadline](Sender::send_deadline) operation. + pub fn send_deadline_unnotified( + &self, + msg: T, + deadline: Instant, + ) -> Result<(), SendTimeoutError> { + self.send_internal(msg, deadline, false) + } + + fn send_internal( + &self, + msg: T, + deadline: Instant, + notify: bool, + ) -> Result<(), SendTimeoutError> { match &self.flavor { - SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), - SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), + SenderFlavor::Array(chan) => chan.send(msg, Some(deadline), notify), + SenderFlavor::List(chan) => chan.send(msg, Some(deadline), notify), SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), } } @@ -1512,8 +1642,8 @@ impl SelectHandle for Receiver { /// Writes a message into the channel. pub(crate) unsafe fn write(s: &Sender, token: &mut Token, msg: T) -> Result<(), T> { match &s.flavor { - SenderFlavor::Array(chan) => chan.write(token, msg), - SenderFlavor::List(chan) => chan.write(token, msg), + SenderFlavor::Array(chan) => chan.write(token, msg, true), + SenderFlavor::List(chan) => chan.write(token, msg, true), SenderFlavor::Zero(chan) => chan.write(token, msg), } } diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 63b82eb85..6a11be147 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -210,7 +210,7 @@ impl Channel { } /// Writes a message into the channel. - pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify: bool) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.array.slot.is_null() { return Err(msg); @@ -223,7 +223,9 @@ impl Channel { slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping receiver. - self.receivers.notify(); + if notify { + self.receivers.notify(); + } Ok(()) } @@ -319,10 +321,13 @@ impl Channel { } /// Attempts to send a message into the channel. - pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { + pub(crate) fn try_send(&self, msg: T, notify: bool) -> Result<(), TrySendError> { let token = &mut Token::default(); if self.start_send(token) { - unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } + unsafe { + self.write(token, msg, notify) + .map_err(TrySendError::Disconnected) + } } else { Err(TrySendError::Full(msg)) } @@ -333,6 +338,7 @@ impl Channel { &self, msg: T, deadline: Option, + notify: bool, ) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); loop { @@ -340,7 +346,7 @@ impl Channel { let backoff = Backoff::new(); loop { if self.start_send(token) { - let res = unsafe { self.write(token, msg) }; + let res = unsafe { self.write(token, msg, notify) }; return res.map_err(SendTimeoutError::Disconnected); } diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 230edd8d2..e8f95a28e 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -278,7 +278,7 @@ impl Channel { } /// Writes a message into the channel. - pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T, notify: bool) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.list.block.is_null() { return Err(msg); @@ -292,7 +292,9 @@ impl Channel { slot.state.fetch_or(WRITE, Ordering::Release); // Wake a sleeping receiver. - self.receivers.notify(); + if notify { + self.receivers.notify(); + } Ok(()) } @@ -407,8 +409,8 @@ impl Channel { } /// Attempts to send a message into the channel. - pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { - self.send(msg, None).map_err(|err| match err { + pub(crate) fn try_send(&self, msg: T, notify: bool) -> Result<(), TrySendError> { + self.send(msg, None, notify).map_err(|err| match err { SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), SendTimeoutError::Timeout(_) => unreachable!(), }) @@ -419,11 +421,12 @@ impl Channel { &self, msg: T, _deadline: Option, + notify: bool, ) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); assert!(self.start_send(token)); unsafe { - self.write(token, msg) + self.write(token, msg, notify) .map_err(SendTimeoutError::Disconnected) } }