Last active
December 19, 2023 22:34
-
-
Save spullara/8906058 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sampullara; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Spliterator; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
import java.util.stream.Stream; | |
import java.util.stream.StreamSupport; | |
/** | |
* Decide at runtime whether this stream should be parallel. | |
* <p> | |
* User: sam | |
* Date: 2/9/14 | |
* Time: 12:38 PM | |
*/ | |
public class MaybeParallel { | |
static class SpliteratorException extends RuntimeException { | |
private final Spliterator spliterator; | |
private final Object element; | |
SpliteratorException(Spliterator spliterator, Object element) { | |
this.spliterator = spliterator; | |
this.element = element; | |
} | |
public Spliterator getSpliterator() { | |
return spliterator; | |
} | |
Object getElement() { | |
return element; | |
} | |
} | |
public static <T>Stream<T> maybeParallel(Stream<T> stream, Decider decider) { | |
Stream<T> sequential = stream.sequential(); | |
Spliterator<T> s = sequential.spliterator(); | |
Spliterator<T> spliterator = new Spliterator<T>() { | |
private T caughtElement = null; | |
@Override | |
public boolean tryAdvance(Consumer<? super T> action) { | |
if (caughtElement != null) { | |
action.accept(caughtElement); | |
caughtElement = null; | |
return true; | |
} | |
try { | |
return s.tryAdvance(action); | |
} catch (SpliteratorException e) { | |
caughtElement = (T) e.getElement(); | |
throw e; | |
} | |
} | |
@Override | |
public Spliterator<T> trySplit() { | |
return s.trySplit(); | |
} | |
@Override | |
public long estimateSize() { | |
return s.estimateSize(); | |
} | |
@Override | |
public int characteristics() { | |
return s.characteristics(); | |
} | |
}; | |
return StreamSupport.stream(spliterator, false).peek(new Consumer<T>() { | |
volatile int elements = 0; | |
Long start; | |
@Override | |
public void accept(T t) { | |
if (start == null) { | |
start = System.nanoTime(); | |
} | |
if (elements++ > 0) { | |
if (decider.switchToParallel(elements, System.nanoTime() - start)) { | |
throw new SpliteratorException(spliterator, t); | |
} | |
} | |
} | |
}); | |
} | |
public static void main(String[] args) { | |
AtomicInteger total = new AtomicInteger(); | |
List<Integer> integers = new ArrayList<>(100); | |
for (int i = 0; i < 100; i++) { | |
integers.add(i); | |
} | |
Stream<Integer> integerStream = maybeParallel(integers.stream(), (e, t) -> { | |
double v = ((double) t) / e; | |
return v > 5_000_000; | |
}); | |
try { | |
integerStream.forEach(i -> { | |
total.getAndIncrement(); | |
System.out.println("Sequential: " + i); | |
try { | |
Thread.sleep(i); | |
} catch (InterruptedException e) { | |
} | |
}); | |
} catch (SpliteratorException e) { | |
Stream<Integer> stream = StreamSupport.stream(e.getSpliterator(), true); | |
stream.forEach(i -> { | |
total.getAndIncrement(); | |
System.out.println("Parallel: " + i); | |
}); | |
} | |
System.out.println("Total: " + (total.get() == 100)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment