Created
March 25, 2020 10:23
-
-
Save zesterer/59b2b1949430cc74ec26827b24ba6f5b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/lib.rs b/src/lib.rs | |
index 24534a8..8121dd9 100644 | |
--- a/src/lib.rs | |
+++ b/src/lib.rs | |
@@ -170,6 +170,11 @@ fn wait_lock<'a, T>(lock: &'a InnerMutex<T>) -> MutexGuard<'a, T> { | |
} | |
} | |
+#[inline] | |
+fn poll_lock<'a, T>(lock: &'a InnerMutex<T>) -> Option<MutexGuard<'a, T>> { | |
+ lock.try_lock() | |
+} | |
+ | |
/// Wrapper around a queue. This wrapper exists to permit a maximum length. | |
struct Queue<T>(VecDeque<T>, Option<usize>); | |
@@ -621,7 +626,25 @@ impl<T> Sender<T> { | |
pub fn send(&self, msg: T) -> Result<(), SendError<T>> { | |
match &self.shared.chan { | |
Channel::Bounded { cap, pending, queue } => { | |
- let mut queue = wait_lock(&queue); | |
+ let mut queue = { | |
+ let mut i = 0; | |
+ loop { | |
+ if i > *cap { | |
+ break wait_lock(&queue); | |
+ } | |
+ | |
+ if let Some(queue) = poll_lock(&queue) { | |
+ if queue.len() < *cap { | |
+ break queue; | |
+ } | |
+ } | |
+ | |
+ i += 1; | |
+ | |
+ drop(queue); | |
+ thread::yield_now(); | |
+ } | |
+ }; | |
if self.shared.disconnected.load(Ordering::Relaxed) { | |
Err(SendError(msg)) | |
} else if queue.len() < *cap { | |
@@ -633,10 +656,12 @@ impl<T> Sender<T> { | |
*unblock_signal.lock() = Some(msg); | |
wait_lock(&pending).push_back(unblock_signal.clone()); | |
+ | |
self.shared.send_signal.notify_one(()); | |
- unblock_signal.wait_while(queue, |msg| msg.is_some() && | |
- !self.shared.disconnected.load(Ordering::Relaxed)); | |
+ unblock_signal.wait_while(queue, |msg| { | |
+ msg.is_some() && !self.shared.disconnected.load(Ordering::Relaxed) | |
+ }); | |
if let Some(msg) = unblock_signal.lock().take() { | |
Err(SendError(msg)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment