Created
January 1, 2016 10:34
-
-
Save cuviper/cf7a055c468d0447b570 to your computer and use it in GitHub Desktop.
alt latch condvars for nikomatsakis/rayon#9
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/api.rs b/src/api.rs | |
index 5d7871d5bc06..e122eb4dce16 100644 | |
--- a/src/api.rs | |
+++ b/src/api.rs | |
@@ -1,4 +1,4 @@ | |
-use latch::Latch; | |
+use latch::{ProbingLatch, WaitingLatch}; | |
#[allow(unused_imports)] | |
use log::Event::*; | |
use job::{Code, CodeImpl, Job}; | |
@@ -45,7 +45,7 @@ pub fn join<A,B,RA,RB>(oper_a: A, | |
// done here so that the stack frame can keep it all live | |
// long enough | |
let mut code_b = CodeImpl::new(oper_b, &mut result_b); | |
- let mut latch_b = Latch::new(); | |
+ let mut latch_b = ProbingLatch::new(); | |
let mut job_b = Job::new(&mut code_b, &mut latch_b); | |
(*worker_thread).push(&mut job_b); | |
@@ -77,12 +77,12 @@ unsafe fn join_inject<A,B,RA,RB>(oper_a: A, | |
{ | |
let mut result_a = None; | |
let mut code_a = CodeImpl::new(oper_a, &mut result_a); | |
- let mut latch_a = Latch::new(); | |
+ let mut latch_a = WaitingLatch::new(); | |
let mut job_a = Job::new(&mut code_a, &mut latch_a); | |
let mut result_b = None; | |
let mut code_b = CodeImpl::new(oper_b, &mut result_b); | |
- let mut latch_b = Latch::new(); | |
+ let mut latch_b = WaitingLatch::new(); | |
let mut job_b = Job::new(&mut code_b, &mut latch_b); | |
thread_pool::get_registry().inject(&[&mut job_a, &mut job_b]); | |
@@ -112,7 +112,7 @@ impl ThreadPool { | |
unsafe { | |
let mut result_a = None; | |
let mut code_a = CodeImpl::new(op, &mut result_a); | |
- let mut latch_a = Latch::new(); | |
+ let mut latch_a = WaitingLatch::new(); | |
let mut job_a = Job::new(&mut code_a, &mut latch_a); | |
self.registry.inject(&[&mut job_a]); | |
latch_a.wait(); | |
diff --git a/src/latch.rs b/src/latch.rs | |
index a81831d1cc7a..aa6f34387ef6 100644 | |
--- a/src/latch.rs | |
+++ b/src/latch.rs | |
@@ -1,34 +1,69 @@ | |
+use std::sync::{Mutex, Condvar}; | |
use std::sync::atomic::{AtomicBool, Ordering}; | |
-use std::thread; | |
-/// A Latch starts as false and eventually becomes true. You can block | |
-/// until it becomes true. | |
-pub struct Latch { | |
+/// A Latch starts as false and eventually becomes true. | |
+/// The means to check its state depends on the impl. | |
+pub trait Latch { | |
+ | |
+ /// Set the latch to true, releasing all threads who are waiting. | |
+ fn set(&self); | |
+} | |
+ | |
+ | |
+/// A ProbingLatch can be probed to see if it's true yet. | |
+pub struct ProbingLatch { | |
b: AtomicBool | |
} | |
-impl Latch { | |
+impl Latch for ProbingLatch { | |
+ fn set(&self) { | |
+ self.b.store(true, Ordering::SeqCst); | |
+ } | |
+} | |
+ | |
+impl ProbingLatch { | |
#[inline] | |
- pub fn new() -> Latch { | |
- Latch { | |
+ pub fn new() -> ProbingLatch { | |
+ ProbingLatch { | |
b: AtomicBool::new(false) | |
} | |
} | |
- /// Set the latch to true, releasing all threads who are waiting. | |
- pub fn set(&self) { | |
- self.b.store(true, Ordering::SeqCst); | |
+ /// Test if latch is set. | |
+ pub fn probe(&self) -> bool { | |
+ self.b.load(Ordering::SeqCst) | |
+ } | |
+} | |
+ | |
+ | |
+/// A WaitingLatch can efficiently wait until it's true. | |
+pub struct WaitingLatch { | |
+ m: Mutex<bool>, | |
+ c: Condvar, | |
+} | |
+ | |
+impl Latch for WaitingLatch { | |
+ fn set(&self) { | |
+ let mut guard = self.m.lock().unwrap(); | |
+ *guard = true; | |
+ self.c.notify_all(); | |
} | |
+} | |
- /// Spin until latch is set. Use with caution. | |
- pub fn wait(&self) { | |
- while !self.probe() { | |
- thread::yield_now(); | |
+impl WaitingLatch { | |
+ #[inline] | |
+ pub fn new() -> WaitingLatch { | |
+ WaitingLatch { | |
+ m: Mutex::new(false), | |
+ c: Condvar::new(), | |
} | |
} | |
- /// Test if latch is set. | |
- pub fn probe(&self) -> bool { | |
- self.b.load(Ordering::SeqCst) | |
+ /// Spin until latch is set. Use with caution. | |
+ pub fn wait(&self) { | |
+ let mut guard = self.m.lock().unwrap(); | |
+ while !*guard { | |
+ guard = self.c.wait(guard).unwrap(); | |
+ } | |
} | |
} | |
diff --git a/src/thread_pool.rs b/src/thread_pool.rs | |
index 28cbefc2b5c6..163ef719aa4a 100644 | |
--- a/src/thread_pool.rs | |
+++ b/src/thread_pool.rs | |
@@ -1,6 +1,6 @@ | |
use deque::{BufferPool, Worker, Stealer, Stolen}; | |
use job::Job; | |
-use latch::Latch; | |
+use latch::{Latch, ProbingLatch, WaitingLatch}; | |
#[allow(unused_imports)] | |
use log::Event::*; | |
use num_cpus; | |
@@ -145,7 +145,7 @@ impl RegistryState { | |
struct ThreadInfo { | |
// latch is set once thread has started and we are entering into | |
// the main loop | |
- primed: Latch, | |
+ primed: WaitingLatch, | |
worker: Worker<JobRef>, | |
stealer: Stealer<JobRef>, | |
} | |
@@ -154,7 +154,7 @@ impl ThreadInfo { | |
fn new(pool: &BufferPool<JobRef>) -> ThreadInfo { | |
let (worker, stealer) = pool.deque(); | |
ThreadInfo { | |
- primed: Latch::new(), | |
+ primed: WaitingLatch::new(), | |
worker: worker, | |
stealer: stealer, | |
} | |
@@ -219,7 +219,7 @@ impl WorkerThread { | |
self.thread_info().worker.pop().is_some() | |
} | |
- pub unsafe fn steal_until(&self, latch: &Latch) { | |
+ pub unsafe fn steal_until(&self, latch: &ProbingLatch) { | |
while !latch.probe() { | |
if let Some(job) = steal_work(&self.registry, self.index) { | |
(*job).execute(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment