Last active
April 11, 2017 13:23
-
-
Save nikomatsakis/ef3d40429f5e225472e7bb42326755b9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate rayon; | |
extern crate futures; | |
use futures::lazy; | |
use rayon::prelude::*; | |
use rayon::iter::internal::*; | |
// use rayon::Scope; | |
pub trait PipelineMethods | |
where Self: Sized + IntoIterator, Self::Item: Send, | |
{ | |
fn par_pipeline(self) -> Pipeline<Self> where Self: Send; | |
} | |
impl<I> PipelineMethods for I | |
where I: IntoIterator, I::Item: Send, | |
{ | |
fn par_pipeline(self) -> Pipeline<Self> where Self: Send { | |
Pipeline { iter: self } | |
} | |
// fn par_pipeline_in<'s, 'scope>(self, scope: &'s Scope<'scope>) -> PipelineScoped<'s, 'scope, Self> { | |
// PipelineScoped { iter: self, scope: scope } | |
// } | |
} | |
pub struct Pipeline<I> | |
where I: IntoIterator + Send, I::Item: Send, | |
{ | |
iter: I, | |
} | |
impl<I> ParallelIterator for Pipeline<I> | |
where I: IntoIterator + Send, I::Item: Send, | |
{ | |
type Item = I::Item; | |
fn drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item> { | |
let iter = self.iter; | |
rayon::scope(|scope| drive(scope, iter, consumer)) | |
} | |
} | |
//pub struct PipelineScoped<'s, 'scope, I> | |
// where I: IntoIterator, I::Item: Send, 'scope: 's, | |
//{ | |
// iter: I, | |
// scope: &'s Scope<'scope>, | |
//} | |
// | |
//impl<'s, 'scope, I> ParallelIterator for PipelineScoped<'s, 'scope, I> | |
// where I: IntoIterator, I::Item: Send, | |
//{ | |
// type Item = I::Item; | |
// | |
// fn drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item> { | |
// drive(self.scope, self.iter, consumer) | |
// } | |
//} | |
fn drive<'scope, I, C>(scope: &rayon::Scope<'scope>, iter: I, consumer: C) -> C::Result | |
where I: IntoIterator, | |
I::Item: Send + 'scope, | |
C: UnindexedConsumer<I::Item> + 'scope, | |
C::Result: 'scope, | |
{ | |
let futures: Vec<_> = | |
iter.into_iter() | |
.map(|item| { | |
let left_consumer = consumer.split_off_left(); | |
scope.spawn_future(lazy(move || { | |
Ok::<_, ()>(left_consumer.into_folder().consume(item).complete()) | |
})) | |
}) | |
.collect(); | |
let mut left = consumer.split_off_left().into_folder().complete(); | |
for future in futures { | |
let right = future.rayon_wait().unwrap(); | |
left = consumer.to_reducer().reduce(left, right); | |
} | |
left | |
} | |
pub fn main() { | |
let x = vec![1, 2, 3, 4, 5]; | |
let y: Vec<u32> = x.iter().par_pipeline().map(|&i| i + 1).collect(); | |
println!("x={:?}", x); | |
println!("y={:?}", y); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment