-
-
Save Dierk/7ac2ece12e7133bfcf3c to your computer and use it in GitHub Desktop.
import java.util.Optional; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Supplier; | |
import java.util.stream.Stream; | |
public class RNG { | |
static Supplier<Integer> countGen(AtomicInteger i) { | |
return (()-> i.getAndIncrement()); | |
} | |
public static void main(String[] args) { | |
final Supplier<Integer> count = countGen(new AtomicInteger(1)); | |
final Optional<Integer> sum = Stream.generate(count).limit(10).parallel().reduce((a, b) -> a + b); | |
System.out.println("sum = " + sum); | |
} | |
} |
Wow, that's interesting.
I added a second AtomicInteger that is used in a map function. By doing so the result is always 55. So it looks like the the AtomicInteger works fine here but it looks like generate(...).limit(....) isn't working well in parallel. Based on this I had a deeper look at the JavaDocs and the generate(...) methods:
Returns an infinite sequential unordered stream
So at the end I think it does what the JavaDoc says.
But: When having a first look at this you normally will think that the Supplier for number generation is called 10 times and then this 10 results will be used. In reality it's called 512 times and any of this 512 results will be used. This results are 1 - 512 so the AtomicInteger works fine but the limit(10) returns (randomly) 10 values of this stream since it's unordered. Since the stream is handled in parallel the generator has no idea how many values he should produce and 512 looks like the default value (that looks very high for me).
Conclusion: Can't believe it when I saw the code but in the end it looks like it does what the JavaDoc defines. But I think most developers would expect a different result. Maybe generateUnsorted(...) would be a better name for the method.
Question: Is there a way to create a ordered stream? generate(...).sort().limit() ends in a endless call (based on the sort).
public class ParallelStreamTest {
public static void main(String[] args) {
final AtomicInteger counter = new AtomicInteger(1);
final AtomicInteger mapper = new AtomicInteger(1);
final Optional<Integer> sum = Stream.generate(() -> {
int val = counter.getAndIncrement();
System.out.println("Generated: " + val);
return val;
}).limit(10).map(i -> {
int val = mapper.getAndIncrement();
System.out.println("Mapped: " + val);
return val;
}).parallel().reduce((a, b) -> a + b);
System.out.println("sum = " + sum);
}
}
in which case limit
should be named pickRandomFrom
and BTW it also works fine when not using parallel
. Why so?
In that case the supplier (generator) is only called 10 times. I think the generator has no idea when to stop and at one point the limit() has handled 10 values it will stop the generator. Since the generator is called on several threads (parallel) it directly produces some (512) values that can be handled in parallel in following operations. Producing one value after each other wouldn't make sense in parallel mode. I think that the limit() operator will stop the value production once it has handled 10 values. But on that case the generator has already created more than 500 values.
If you don't use parallel it's mostly the same but all values will be created in one thread and therefore the limit() really stops the generation of new values after 10 supplier calls.
well, but we go parallel after the limit. So it is somewhat the laziness of the stream operations that doesn't play well with concurrency and mutable state.
Oh, the position of the parallel() call isn't important. Only the last parallel() / sequential() call is important and defines if the stream operations will be called in parallel. It defines if all intermediate operations of the Stream will be called parallel or not. In the following example the filter will be called in parallel:
Arrays.asList(1,2,3,4).stream().sequential().filter(i -> {
System.out.println(Thread.currentThread().getName());
return true;
}).parallel().findFirst();
The randomness does not come from limit() but from generate() - and I very much agree: this is, just from the method name, quite unexpected. iterate(T seed, UnaryOperator f) would probably be the right choice - at least I then get always the same result (55).
With iterate() there is also no need for AtomicInteger anymore.
sum = Optional[145]
sum = Optional[55]
sum = Optional[158]
sum = Optional[292]
sum = Optional[539]