Created
April 4, 2016 00:56
-
-
Save reem/2c7eed5e8cf3d1902478a54103a9276a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate scoped_pool; | |
extern crate itertools; | |
extern crate rand; | |
use rand::Rng; | |
use scoped_pool::{Pool, Scope}; | |
pub fn quicksort<T: Send + Sync + Ord>(pool: &Pool, data: &mut [T]) { | |
pool.scoped(move |scoped| do_quicksort(scoped, data)) | |
} | |
fn do_quicksort<'a, T: Send + Sync + Ord>(scope: &Scope<'a>, data: &'a mut [T]) { | |
scope.recurse(move |scope| { | |
if data.len() > 1 { | |
// Choose a random pivot. | |
let mut rng = rand::thread_rng(); | |
let len = data.len(); | |
let pivot_index = rng.gen_range(0, len); // Choose a random pivot | |
// Swap the pivot to the end. | |
data.swap(pivot_index, len - 1); | |
let split = { | |
// Retrieve the pivot. | |
let mut iter = data.into_iter(); | |
let pivot = iter.next_back().unwrap(); | |
// Partition the array. | |
itertools::partition(iter, |val| &*val <= &pivot) | |
}; | |
// Swap the pivot back in at the split point by putting | |
// the element currently there are at the end of the slice. | |
data.swap(split, len - 1); | |
// Sort both halves. | |
let (left, right) = data.split_at_mut(split); | |
do_quicksort(scope, left); | |
do_quicksort(scope, &mut right[1..]); | |
} | |
}) | |
} | |
pub fn main() { | |
let mut rng = rand::thread_rng(); | |
let pool = Pool::new(8); | |
for _ in 0..10 { | |
let n = rng.gen_range(1, 10000000); | |
println!("Generating {} random elements!", n); | |
let mut elements = (0..n).map(|_| ::rand::random::<u64>()).collect::<Vec<_>>(); | |
println!("Sorting elements!"); | |
quicksort(&pool, &mut elements); | |
println!("Finished sorting elements!"); | |
println!("Verifying the elements are sorted."); | |
for (current, next) in elements.iter().zip(elements[1..].iter()) { | |
assert!(next >= current, "{:?} < {:?}! elements: {:?}", next, current, elements); | |
} | |
} | |
pool.shutdown(); | |
println!("Success."); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// An execution scope, represents a set of jobs running on a Pool. | |
/// | |
/// ## Understanding Scope lifetimes | |
/// | |
/// Besides `Scope<'static>`, all `Scope` objects are accessed behind a | |
/// reference of the form `&'scheduler Scope<'scope>`. | |
/// | |
/// `'scheduler` is the lifetime associated with the *body* of the | |
/// "scheduler" function (functions passed to `zoom`/`scoped`). | |
/// | |
/// `'scope` is the lifetime which data captured in `execute` or `recurse` | |
/// closures must outlive - in other words, `'scope` is the maximum lifetime | |
/// of all jobs scheduler on a `Scope`. | |
/// | |
/// Note that since `'scope: 'scheduler` (`'scope` outlives `'scheduler`) | |
/// `&'scheduler Scope<'scope>` can't be captured in an `execute` closure; | |
/// this is the reason for the existence of the `recurse` API, which will | |
/// inject the same scope with a new `'scheduler` lifetime (this time set | |
/// to the body of the function passed to `recurse`). | |
pub struct Scope<'scope> { | |
// The threadpool this scope is associated with. | |
pool: Pool, | |
// The waitgroup that represents the set of jobs currently | |
// running in this scope. | |
wait: Arc<WaitGroup>, | |
// An invariant lifetime marker representing the minimum lifetime | |
// jobs must live for. | |
_scope: Id<'scope> | |
} | |
impl<'scope> Scope<'scope> { | |
/// Create a Scope which lasts forever. | |
#[inline] | |
pub fn forever(pool: Pool) -> Scope<'static> { | |
Scope { | |
pool: pool, | |
wait: Arc::new(WaitGroup::new()), | |
_scope: Id::default() | |
} | |
} | |
/// Add a job to this scope. | |
/// | |
/// Subsequent calls to `join` will wait for this job to complete. | |
pub fn execute<F>(&self, job: F) | |
where F: FnOnce() + Send + 'scope { | |
// Submit the job *before* submitting it to the queue. | |
self.wait.submit(); | |
let task = unsafe { | |
// Safe because we will ensure the task finishes executing before | |
// 'scope via joining before the resolution of `'scope`. | |
mem::transmute::<Box<Task + Send + 'scope>, | |
Box<Task + Send + 'static>>(Box::new(job)) | |
}; | |
// Submit the task to be executed. | |
self.pool.queue.push(PoolMessage::Task(task, self.wait.clone())); | |
} | |
/// Add a job to this scope which itself will get access to the scope. | |
/// | |
/// Like with `execute`, subsequent calls to `join` will wait for this | |
/// job (and all jobs scheduled on the scope it receives) to complete. | |
pub fn recurse<F>(&self, job: F) | |
where F: FnOnce(&Self) + Send + 'scope { | |
// Create another scope with the *same* lifetime. | |
let this = unsafe { self.clone() }; | |
self.execute(move || job(&this)); | |
} | |
/// Create a new subscope, bound to a lifetime smaller than our existing Scope. | |
/// | |
/// The subscope has a different job set, and is joined before zoom returns. | |
pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R | |
where F: FnOnce(&Scope<'smaller>) -> R, | |
'scope: 'smaller { | |
let scope = unsafe { self.refine::<'smaller>() }; | |
// Join the scope either on completion of the scheduler or panic. | |
defer!(scope.join()); | |
// Schedule all tasks then join all tasks | |
scheduler(&scope) | |
} | |
/// Awaits all jobs submitted on this Scope to be completed. | |
/// | |
/// Only guaranteed to join jobs which where `execute`d logically | |
/// prior to `join`. Jobs `execute`d concurrently with `join` may | |
/// or may not be completed before `join` returns. | |
#[inline] | |
pub fn join(&self) { | |
self.wait.join() | |
} | |
#[inline] | |
unsafe fn clone(&self) -> Self { | |
Scope { | |
pool: self.pool.clone(), | |
wait: self.wait.clone(), | |
_scope: Id::default() | |
} | |
} | |
// Create a new scope with a smaller lifetime on the same pool. | |
#[inline] | |
unsafe fn refine<'other>(&self) -> Scope<'other> where 'scope: 'other { | |
Scope { | |
pool: self.pool.clone(), | |
wait: Arc::new(WaitGroup::new()), | |
_scope: Id::default() | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment