Skip to content

Instantly share code, notes, and snippets.

@reem
Created April 4, 2016 00:56
Show Gist options
  • Save reem/2c7eed5e8cf3d1902478a54103a9276a to your computer and use it in GitHub Desktop.
Save reem/2c7eed5e8cf3d1902478a54103a9276a to your computer and use it in GitHub Desktop.
extern crate scoped_pool;
extern crate itertools;
extern crate rand;
use rand::Rng;
use scoped_pool::{Pool, Scope};
pub fn quicksort<T: Send + Sync + Ord>(pool: &Pool, data: &mut [T]) {
pool.scoped(move |scoped| do_quicksort(scoped, data))
}
fn do_quicksort<'a, T: Send + Sync + Ord>(scope: &Scope<'a>, data: &'a mut [T]) {
scope.recurse(move |scope| {
if data.len() > 1 {
// Choose a random pivot.
let mut rng = rand::thread_rng();
let len = data.len();
let pivot_index = rng.gen_range(0, len); // Choose a random pivot
// Swap the pivot to the end.
data.swap(pivot_index, len - 1);
let split = {
// Retrieve the pivot.
let mut iter = data.into_iter();
let pivot = iter.next_back().unwrap();
// Partition the array.
itertools::partition(iter, |val| &*val <= &pivot)
};
// Swap the pivot back in at the split point by putting
// the element currently there are at the end of the slice.
data.swap(split, len - 1);
// Sort both halves.
let (left, right) = data.split_at_mut(split);
do_quicksort(scope, left);
do_quicksort(scope, &mut right[1..]);
}
})
}
pub fn main() {
let mut rng = rand::thread_rng();
let pool = Pool::new(8);
for _ in 0..10 {
let n = rng.gen_range(1, 10000000);
println!("Generating {} random elements!", n);
let mut elements = (0..n).map(|_| ::rand::random::<u64>()).collect::<Vec<_>>();
println!("Sorting elements!");
quicksort(&pool, &mut elements);
println!("Finished sorting elements!");
println!("Verifying the elements are sorted.");
for (current, next) in elements.iter().zip(elements[1..].iter()) {
assert!(next >= current, "{:?} < {:?}! elements: {:?}", next, current, elements);
}
}
pool.shutdown();
println!("Success.");
}
/// An execution scope, represents a set of jobs running on a Pool.
///
/// ## Understanding Scope lifetimes
///
/// Besides `Scope<'static>`, all `Scope` objects are accessed behind a
/// reference of the form `&'scheduler Scope<'scope>`.
///
/// `'scheduler` is the lifetime associated with the *body* of the
/// "scheduler" function (functions passed to `zoom`/`scoped`).
///
/// `'scope` is the lifetime which data captured in `execute` or `recurse`
/// closures must outlive - in other words, `'scope` is the maximum lifetime
/// of all jobs scheduler on a `Scope`.
///
/// Note that since `'scope: 'scheduler` (`'scope` outlives `'scheduler`)
/// `&'scheduler Scope<'scope>` can't be captured in an `execute` closure;
/// this is the reason for the existence of the `recurse` API, which will
/// inject the same scope with a new `'scheduler` lifetime (this time set
/// to the body of the function passed to `recurse`).
pub struct Scope<'scope> {
// The threadpool this scope is associated with.
pool: Pool,
// The waitgroup that represents the set of jobs currently
// running in this scope.
wait: Arc<WaitGroup>,
// An invariant lifetime marker representing the minimum lifetime
// jobs must live for.
_scope: Id<'scope>
}
impl<'scope> Scope<'scope> {
/// Create a Scope which lasts forever.
#[inline]
pub fn forever(pool: Pool) -> Scope<'static> {
Scope {
pool: pool,
wait: Arc::new(WaitGroup::new()),
_scope: Id::default()
}
}
/// Add a job to this scope.
///
/// Subsequent calls to `join` will wait for this job to complete.
pub fn execute<F>(&self, job: F)
where F: FnOnce() + Send + 'scope {
// Submit the job *before* submitting it to the queue.
self.wait.submit();
let task = unsafe {
// Safe because we will ensure the task finishes executing before
// 'scope via joining before the resolution of `'scope`.
mem::transmute::<Box<Task + Send + 'scope>,
Box<Task + Send + 'static>>(Box::new(job))
};
// Submit the task to be executed.
self.pool.queue.push(PoolMessage::Task(task, self.wait.clone()));
}
/// Add a job to this scope which itself will get access to the scope.
///
/// Like with `execute`, subsequent calls to `join` will wait for this
/// job (and all jobs scheduled on the scope it receives) to complete.
pub fn recurse<F>(&self, job: F)
where F: FnOnce(&Self) + Send + 'scope {
// Create another scope with the *same* lifetime.
let this = unsafe { self.clone() };
self.execute(move || job(&this));
}
/// Create a new subscope, bound to a lifetime smaller than our existing Scope.
///
/// The subscope has a different job set, and is joined before zoom returns.
pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R
where F: FnOnce(&Scope<'smaller>) -> R,
'scope: 'smaller {
let scope = unsafe { self.refine::<'smaller>() };
// Join the scope either on completion of the scheduler or panic.
defer!(scope.join());
// Schedule all tasks then join all tasks
scheduler(&scope)
}
/// Awaits all jobs submitted on this Scope to be completed.
///
/// Only guaranteed to join jobs which where `execute`d logically
/// prior to `join`. Jobs `execute`d concurrently with `join` may
/// or may not be completed before `join` returns.
#[inline]
pub fn join(&self) {
self.wait.join()
}
#[inline]
unsafe fn clone(&self) -> Self {
Scope {
pool: self.pool.clone(),
wait: self.wait.clone(),
_scope: Id::default()
}
}
// Create a new scope with a smaller lifetime on the same pool.
#[inline]
unsafe fn refine<'other>(&self) -> Scope<'other> where 'scope: 'other {
Scope {
pool: self.pool.clone(),
wait: Arc::new(WaitGroup::new()),
_scope: Id::default()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment