Skip to content

Instantly share code, notes, and snippets.

@Dierk
Last active March 22, 2016 22:36
Show Gist options
  • Save Dierk/7ac2ece12e7133bfcf3c to your computer and use it in GitHub Desktop.
Save Dierk/7ac2ece12e7133bfcf3c to your computer and use it in GitHub Desktop.
adding numbers one to ten can easily lead to random results when done in parallel
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
public class RNG {
static Supplier<Integer> countGen(AtomicInteger i) {
return (()-> i.getAndIncrement());
}
public static void main(String[] args) {
final Supplier<Integer> count = countGen(new AtomicInteger(1));
final Optional<Integer> sum = Stream.generate(count).limit(10).parallel().reduce((a, b) -> a + b);
System.out.println("sum = " + sum);
}
}
@hendrikebbers
Copy link

In that case the supplier (generator) is only called 10 times. I think the generator has no idea when to stop and at one point the limit() has handled 10 values it will stop the generator. Since the generator is called on several threads (parallel) it directly produces some (512) values that can be handled in parallel in following operations. Producing one value after each other wouldn't make sense in parallel mode. I think that the limit() operator will stop the value production once it has handled 10 values. But on that case the generator has already created more than 500 values.
If you don't use parallel it's mostly the same but all values will be created in one thread and therefore the limit() really stops the generation of new values after 10 supplier calls.

@Dierk
Copy link
Author

Dierk commented Mar 22, 2016

well, but we go parallel after the limit. So it is somewhat the laziness of the stream operations that doesn't play well with concurrency and mutable state.

@hendrikebbers
Copy link

Oh, the position of the parallel() call isn't important. Only the last parallel() / sequential() call is important and defines if the stream operations will be called in parallel. It defines if all intermediate operations of the Stream will be called parallel or not. In the following example the filter will be called in parallel:

Arrays.asList(1,2,3,4).stream().sequential().filter(i -> {
            System.out.println(Thread.currentThread().getName());
            return true;
        }).parallel().findFirst();

@pat-li
Copy link

pat-li commented Mar 22, 2016

The randomness does not come from limit() but from generate() - and I very much agree: this is, just from the method name, quite unexpected. iterate(T seed, UnaryOperator f) would probably be the right choice - at least I then get always the same result (55).

With iterate() there is also no need for AtomicInteger anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment