Skip to content

Instantly share code, notes, and snippets.

@nikomatsakis
Last active April 11, 2017 13:23
Show Gist options
  • Save nikomatsakis/ef3d40429f5e225472e7bb42326755b9 to your computer and use it in GitHub Desktop.
Save nikomatsakis/ef3d40429f5e225472e7bb42326755b9 to your computer and use it in GitHub Desktop.
extern crate rayon;
extern crate futures;
use futures::lazy;
use rayon::prelude::*;
use rayon::iter::internal::*;
// use rayon::Scope;
pub trait PipelineMethods
where Self: Sized + IntoIterator, Self::Item: Send,
{
fn par_pipeline(self) -> Pipeline<Self> where Self: Send;
}
impl<I> PipelineMethods for I
where I: IntoIterator, I::Item: Send,
{
fn par_pipeline(self) -> Pipeline<Self> where Self: Send {
Pipeline { iter: self }
}
// fn par_pipeline_in<'s, 'scope>(self, scope: &'s Scope<'scope>) -> PipelineScoped<'s, 'scope, Self> {
// PipelineScoped { iter: self, scope: scope }
// }
}
pub struct Pipeline<I>
where I: IntoIterator + Send, I::Item: Send,
{
iter: I,
}
impl<I> ParallelIterator for Pipeline<I>
where I: IntoIterator + Send, I::Item: Send,
{
type Item = I::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item> {
let iter = self.iter;
rayon::scope(|scope| drive(scope, iter, consumer))
}
}
//pub struct PipelineScoped<'s, 'scope, I>
// where I: IntoIterator, I::Item: Send, 'scope: 's,
//{
// iter: I,
// scope: &'s Scope<'scope>,
//}
//
//impl<'s, 'scope, I> ParallelIterator for PipelineScoped<'s, 'scope, I>
// where I: IntoIterator, I::Item: Send,
//{
// type Item = I::Item;
//
// fn drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item> {
// drive(self.scope, self.iter, consumer)
// }
//}
fn drive<'scope, I, C>(scope: &rayon::Scope<'scope>, iter: I, consumer: C) -> C::Result
where I: IntoIterator,
I::Item: Send + 'scope,
C: UnindexedConsumer<I::Item> + 'scope,
C::Result: 'scope,
{
let futures: Vec<_> =
iter.into_iter()
.map(|item| {
let left_consumer = consumer.split_off_left();
scope.spawn_future(lazy(move || {
Ok::<_, ()>(left_consumer.into_folder().consume(item).complete())
}))
})
.collect();
let mut left = consumer.split_off_left().into_folder().complete();
for future in futures {
let right = future.rayon_wait().unwrap();
left = consumer.to_reducer().reduce(left, right);
}
left
}
pub fn main() {
let x = vec![1, 2, 3, 4, 5];
let y: Vec<u32> = x.iter().par_pipeline().map(|&i| i + 1).collect();
println!("x={:?}", x);
println!("y={:?}", y);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment