Last active
October 28, 2016 19:59
-
-
Save cuviper/401b55afd08993414f554e071d972cec to your computer and use it in GitHub Desktop.
WIP rayon find
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml | |
index 4a5d2697a58c..7d17e8771bf6 100644 | |
--- a/rayon-demo/Cargo.toml | |
+++ b/rayon-demo/Cargo.toml | |
@@ -13,3 +13,4 @@ rustc-serialize = "0.3" | |
time = "0.1" | |
itertools = "0.4" | |
num = "0.1.30" | |
+lazy_static = "0.2.1" | |
diff --git a/rayon-demo/src/find/mod.rs b/rayon-demo/src/find/mod.rs | |
new file mode 100644 | |
index 000000000000..85a9709da9bf | |
--- /dev/null | |
+++ b/rayon-demo/src/find/mod.rs | |
@@ -0,0 +1,65 @@ | |
+/// Simple benchmarks of `find()` performance | |
+ | |
+use rayon::prelude::*; | |
+use rand::{Rng, SeedableRng, XorShiftRng}; | |
+use test::Bencher; | |
+ | |
+ | |
+lazy_static! { | |
+ static ref HAYSTACK: Vec<u32> = { | |
+ let mut rng = XorShiftRng::from_seed([0, 1, 2, 3]); | |
+ (0..10_000_000).map(|_| rng.next_u32()).collect() | |
+ }; | |
+} | |
+ | |
+ | |
+#[bench] | |
+fn parallel_find_first(b: &mut Bencher) { | |
+ let needle = HAYSTACK[0]; | |
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+#[bench] | |
+fn serial_find_first(b: &mut Bencher) { | |
+ let needle = HAYSTACK[0]; | |
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+ | |
+#[bench] | |
+fn parallel_find_last(b: &mut Bencher) { | |
+ let needle = HAYSTACK[HAYSTACK.len()-1]; | |
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+#[bench] | |
+fn serial_find_last(b: &mut Bencher) { | |
+ let needle = HAYSTACK[HAYSTACK.len()-1]; | |
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+ | |
+#[bench] | |
+fn parallel_find_middle(b: &mut Bencher) { | |
+ let needle = HAYSTACK[HAYSTACK.len() / 3 * 2]; | |
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+#[bench] | |
+fn serial_find_middle(b: &mut Bencher) { | |
+ let needle = HAYSTACK[HAYSTACK.len() / 3 * 2]; | |
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_some())); | |
+} | |
+ | |
+ | |
+#[bench] | |
+fn parallel_find_missing(b: &mut Bencher) { | |
+ let needle = HAYSTACK.iter().max().unwrap() + 1; | |
+ b.iter(|| assert!(HAYSTACK.par_iter().find(|&&x| x == needle).is_none())); | |
+} | |
+ | |
+#[bench] | |
+fn serial_find_missing(b: &mut Bencher) { | |
+ let needle = HAYSTACK.iter().max().unwrap() + 1; | |
+ b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x == needle).is_none())); | |
+} | |
diff --git a/rayon-demo/src/main.rs b/rayon-demo/src/main.rs | |
index 69db370bf649..665a5288c0ff 100644 | |
--- a/rayon-demo/src/main.rs | |
+++ b/rayon-demo/src/main.rs | |
@@ -15,6 +15,7 @@ mod sieve; | |
#[cfg(test)] mod factorial; | |
#[cfg(test)] mod pythagoras; | |
#[cfg(test)] mod fibonacci; | |
+#[cfg(test)] mod find; | |
extern crate rayon; // all | |
extern crate docopt; // all | |
@@ -26,6 +27,8 @@ extern crate rustc_serialize; // nbody | |
extern crate time; // nbody, sieve | |
extern crate itertools; // sieve | |
extern crate num; // factorial | |
+#[macro_use] | |
+extern crate lazy_static; // find | |
#[cfg(test)] | |
extern crate test; | |
diff --git a/src/par_iter/filter.rs b/src/par_iter/filter.rs | |
index 652463ed696e..9674607de776 100644 | |
--- a/src/par_iter/filter.rs | |
+++ b/src/par_iter/filter.rs | |
@@ -83,6 +83,10 @@ impl<'f, ITEM, C, FILTER_OP: 'f> Consumer<ITEM> for FilterConsumer<'f, C, FILTER | |
fn into_folder(self) -> Self::Folder { | |
FilterFolder { base: self.base.into_folder(), filter_op: self.filter_op, } | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
@@ -122,4 +126,8 @@ impl<'f, C, FILTER_OP, ITEM> Folder<ITEM> for FilterFolder<'f, C, FILTER_OP> | |
fn complete(self) -> Self::Result { | |
self.base.complete() | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
diff --git a/src/par_iter/filter_map.rs b/src/par_iter/filter_map.rs | |
index bf774f8994a9..53d7541b8c88 100644 | |
--- a/src/par_iter/filter_map.rs | |
+++ b/src/par_iter/filter_map.rs | |
@@ -90,6 +90,10 @@ impl<'f, ITEM, MAPPED_ITEM, C, FILTER_OP> Consumer<ITEM> | |
FilterMapFolder { base: base, | |
filter_op: self.filter_op } | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
impl<'f, ITEM, MAPPED_ITEM, C, FILTER_OP> UnindexedConsumer<ITEM> | |
@@ -131,5 +135,9 @@ impl<'f, ITEM, C_ITEM, C, FILTER_OP> Folder<ITEM> for FilterMapFolder<'f, C, FIL | |
fn complete(self) -> C::Result { | |
self.base.complete() | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
diff --git a/src/par_iter/find.rs b/src/par_iter/find.rs | |
new file mode 100644 | |
index 000000000000..c0bf17dd5aa8 | |
--- /dev/null | |
+++ b/src/par_iter/find.rs | |
@@ -0,0 +1,107 @@ | |
+use std::sync::atomic::{AtomicBool, Ordering}; | |
+use super::*; | |
+use super::len::*; | |
+use super::internal::*; | |
+ | |
+pub fn find<PAR_ITER, FIND_OP>(pi: PAR_ITER, find_op: FIND_OP) -> Option<PAR_ITER::Item> | |
+ where PAR_ITER: ParallelIterator, | |
+ FIND_OP: Fn(&PAR_ITER::Item) -> bool + Sync, | |
+{ | |
+ let found = AtomicBool::new(false); | |
+ let consumer = FindConsumer::new(&find_op, &found); | |
+ pi.drive_unindexed(consumer) | |
+} | |
+ | |
+struct FindConsumer<'f, FIND_OP: 'f> { | |
+ find_op: &'f FIND_OP, | |
+ found: &'f AtomicBool, | |
+} | |
+ | |
+impl<'f, FIND_OP> FindConsumer<'f, FIND_OP> { | |
+ fn new(find_op: &'f FIND_OP, found: &'f AtomicBool) -> Self { | |
+ FindConsumer { | |
+ find_op: find_op, | |
+ found: found, | |
+ } | |
+ } | |
+} | |
+ | |
+impl<'f, ITEM, FIND_OP: 'f> Consumer<ITEM> for FindConsumer<'f, FIND_OP> | |
+ where ITEM: Send, FIND_OP: Fn(&ITEM) -> bool + Sync, | |
+{ | |
+ type Folder = FindFolder<'f, ITEM, FIND_OP>; | |
+ type Reducer = FindReducer; | |
+ type Result = Option<ITEM>; | |
+ | |
+ fn cost(&mut self, cost: f64) -> f64 { | |
+ // This isn't quite right, as we will do more than O(n) reductions, but whatever. | |
+ cost * FUNC_ADJUSTMENT | |
+ } | |
+ | |
+ fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { | |
+ (self.split_off(), self, FindReducer) | |
+ } | |
+ | |
+ fn into_folder(self) -> Self::Folder { | |
+ FindFolder { | |
+ find_op: self.find_op, | |
+ found: self.found, | |
+ item: None, | |
+ } | |
+ } | |
+ | |
+ fn full(&self) -> bool { | |
+ self.found.load(Ordering::Relaxed) | |
+ } | |
+} | |
+ | |
+ | |
+impl<'f, ITEM, FIND_OP: 'f> UnindexedConsumer<ITEM> for FindConsumer<'f, FIND_OP> | |
+ where ITEM: Send, FIND_OP: Fn(&ITEM) -> bool + Sync, | |
+{ | |
+ fn split_off(&self) -> Self { | |
+ FindConsumer::new(self.find_op, self.found) | |
+ } | |
+ | |
+ fn to_reducer(&self) -> Self::Reducer { | |
+ FindReducer | |
+ } | |
+} | |
+ | |
+ | |
+struct FindFolder<'f, ITEM, FIND_OP: 'f> { | |
+ find_op: &'f FIND_OP, | |
+ found: &'f AtomicBool, | |
+ item: Option<ITEM>, | |
+} | |
+ | |
+impl<'f, ITEM, FIND_OP> Folder<ITEM> for FindFolder<'f, ITEM, FIND_OP> | |
+ where FIND_OP: Fn(&ITEM) -> bool + 'f | |
+{ | |
+ type Result = Option<ITEM>; | |
+ | |
+ fn consume(mut self, item: ITEM) -> Self { | |
+ if (self.find_op)(&item) { | |
+ self.found.store(true, Ordering::Relaxed); | |
+ self.item = Some(item); | |
+ } | |
+ self | |
+ } | |
+ | |
+ fn complete(self) -> Self::Result { | |
+ self.item | |
+ } | |
+ | |
+ fn full(&self) -> bool { | |
+ self.found.load(Ordering::Relaxed) | |
+ } | |
+} | |
+ | |
+ | |
+struct FindReducer; | |
+ | |
+impl<ITEM> Reducer<Option<ITEM>> for FindReducer { | |
+ fn reduce(self, left: Option<ITEM>, right: Option<ITEM>) -> Option<ITEM> { | |
+ left.or(right) | |
+ } | |
+} | |
diff --git a/src/par_iter/flat_map.rs b/src/par_iter/flat_map.rs | |
index 175e2652687b..4b2902c88109 100644 | |
--- a/src/par_iter/flat_map.rs | |
+++ b/src/par_iter/flat_map.rs | |
@@ -78,6 +78,10 @@ impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> Consumer<ITEM> | |
previous: None, | |
} | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> UnindexedConsumer<ITEM> | |
@@ -136,4 +140,8 @@ impl<'m, ITEM, MAPPED_ITEM, C, MAP_OP> Folder<ITEM> | |
None => self.base.into_folder().complete(), | |
} | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
diff --git a/src/par_iter/internal.rs b/src/par_iter/internal.rs | |
index 92fb2f1bd547..145a4e3a0bf8 100644 | |
--- a/src/par_iter/internal.rs | |
+++ b/src/par_iter/internal.rs | |
@@ -52,6 +52,7 @@ pub trait Consumer<Item>: Send + Sized { | |
/// sequentially, eventually producing a final result. | |
fn into_folder(self) -> Self::Folder; | |
+ fn full(&self) -> bool { false } | |
} | |
pub trait Folder<Item> { | |
@@ -62,6 +63,8 @@ pub trait Folder<Item> { | |
/// Finish consuming items, produce final result. | |
fn complete(self) -> Self::Result; | |
+ | |
+ fn full(&self) -> bool { false } | |
} | |
pub trait Reducer<Result> { | |
@@ -167,7 +170,9 @@ fn bridge_producer_consumer<P,C>(len: usize, | |
-> C::Result | |
where P: Producer, C: Consumer<P::Item> | |
{ | |
- if len > 1 && splitter.try() { | |
+ if consumer.full() { | |
+ consumer.into_folder().complete() | |
+ } else if len > 1 && splitter.try() { | |
let mid = len / 2; | |
let (left_producer, right_producer) = producer.split_at(mid); | |
let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); | |
@@ -181,6 +186,7 @@ fn bridge_producer_consumer<P,C>(len: usize, | |
let mut folder = consumer.into_folder(); | |
for item in producer { | |
folder = folder.consume(item); | |
+ if folder.full() { break } | |
} | |
folder.complete() | |
} | |
diff --git a/src/par_iter/map.rs b/src/par_iter/map.rs | |
index 40ae2c88d589..981359ef3175 100644 | |
--- a/src/par_iter/map.rs | |
+++ b/src/par_iter/map.rs | |
@@ -228,6 +228,10 @@ impl<'m, ITEM, C, MAP_OP> Consumer<ITEM> for MapConsumer<'m, C, MAP_OP> | |
map_op: self.map_op, | |
} | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
impl<'m, ITEM, C, MAP_OP> UnindexedConsumer<ITEM> | |
@@ -266,5 +270,9 @@ impl<'m, ITEM, C, MAP_OP> Folder<ITEM> for MapFolder<'m, C, MAP_OP> | |
fn complete(self) -> C::Result { | |
self.base.complete() | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
diff --git a/src/par_iter/mod.rs b/src/par_iter/mod.rs | |
index d0180958ea1e..6d7809bf0438 100644 | |
--- a/src/par_iter/mod.rs | |
+++ b/src/par_iter/mod.rs | |
@@ -26,6 +26,7 @@ use self::internal::*; | |
use self::weight::Weight; | |
use self::zip::ZipIter; | |
+pub mod find; | |
pub mod chain; | |
pub mod collect; | |
pub mod enumerate; | |
@@ -372,6 +373,24 @@ pub trait ParallelIterator: Sized { | |
ChainIter::new(self, chain.into_par_iter()) | |
} | |
+ fn find<FIND_OP>(self, predicate: FIND_OP) -> Option<Self::Item> | |
+ where FIND_OP: Fn(&Self::Item) -> bool + Sync | |
+ { | |
+ find::find(self, predicate) | |
+ } | |
+ | |
+ fn any<ANY_OP>(self, predicate: ANY_OP) -> bool | |
+ where ANY_OP: Fn(Self::Item) -> bool + Sync | |
+ { | |
+ self.map(predicate).find(|&p| p).is_some() | |
+ } | |
+ | |
+ fn all<ALL_OP>(self, predicate: ALL_OP) -> bool | |
+ where ALL_OP: Fn(Self::Item) -> bool + Sync | |
+ { | |
+ self.map(predicate).find(|&p| !p).is_none() | |
+ } | |
+ | |
/// Internal method used to define the behavior of this parallel | |
/// iterator. You should not need to call this directly. | |
#[doc(hidden)] | |
@@ -450,5 +469,13 @@ pub trait IndexedParallelIterator: ExactParallelIterator { | |
fn enumerate(self) -> Enumerate<Self> { | |
Enumerate::new(self) | |
} | |
+ | |
+ fn position<POSITION_OP>(self, predicate: POSITION_OP) -> Option<usize> | |
+ where POSITION_OP: Fn(Self::Item) -> bool + Sync | |
+ { | |
+ self.map(predicate).enumerate() | |
+ .find(|&(_, p)| p) | |
+ .map(|(i, _)| i) | |
+ } | |
} | |
diff --git a/src/par_iter/test.rs b/src/par_iter/test.rs | |
index e5be447897c2..04d77ca4000f 100644 | |
--- a/src/par_iter/test.rs | |
+++ b/src/par_iter/test.rs | |
@@ -576,3 +576,22 @@ pub fn check_chain() { | |
.sum(); | |
assert_eq!(sum, 2500); | |
} | |
+ | |
+#[test] | |
+pub fn find() { | |
+ let a: Vec<i32> = (0..1024).collect(); | |
+ | |
+ assert!(a.par_iter().find(|&&x| x % 42 == 41).is_some()); | |
+ assert_eq!(a.par_iter().find(|&&x| x % 19 == 1 && x % 53 == 0), Some(&742_i32)); | |
+ assert_eq!(a.par_iter().find(|&&x| x < 0), None); | |
+ | |
+ assert!(a.par_iter().position(|&x| x % 42 == 41).is_some()); | |
+ assert_eq!(a.par_iter().position(|&x| x % 19 == 1 && x % 53 == 0), Some(742_usize)); | |
+ assert_eq!(a.par_iter().position(|&x| x < 0), None); | |
+ | |
+ assert!(a.par_iter().any(|&x| x > 1000)); | |
+ assert!(!a.par_iter().any(|&x| x < 0)); | |
+ | |
+ assert!(!a.par_iter().all(|&x| x > 1000)); | |
+ assert!(a.par_iter().all(|&x| x >= 0)); | |
+} | |
diff --git a/src/par_iter/weight.rs b/src/par_iter/weight.rs | |
index c30d0bdfa603..d01c45a1ee29 100644 | |
--- a/src/par_iter/weight.rs | |
+++ b/src/par_iter/weight.rs | |
@@ -141,6 +141,10 @@ impl<C, ITEM> Consumer<ITEM> for WeightConsumer<C> | |
fn into_folder(self) -> C::Folder { | |
self.base.into_folder() | |
} | |
+ | |
+ fn full(&self) -> bool { | |
+ self.base.full() | |
+ } | |
} | |
impl<C, ITEM> UnindexedConsumer<ITEM> for WeightConsumer<C> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment