Skip to content

Instantly share code, notes, and snippets.

@pivovarit
Created March 27, 2025 18:44
Show Gist options
  • Save pivovarit/d01258ff6c06e4aa790e54d8ca5878ec to your computer and use it in GitHub Desktop.
Save pivovarit/d01258ff6c06e4aa790e54d8ca5878ec to your computer and use it in GitHub Desktop.
merge_it_27_03_2025
package it.merge;
import com.pivovarit.collectors.ParallelCollectors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
/**
* @author Grzegorz Piwowarek
* WJUG/Vavr
* Oracle ACE
*/
class Main {
public static void main(String[] args) {
Stream.iterate(0, i -> i + 1).limit(20)
.gather(Gatherer.of(Gatherer.Integrator.of((state, e, ds) -> {
ds.push(e);
return ThreadLocalRandom.current().nextBoolean();
})))
.forEach(System.out::println);
}
record Example1() {
public static void main(String[] args) {
List<Map.Entry<String, Long>> zipped = Stream.of("a", "b", "c", "d")
.gather(zipWithIndex())
.toList();
System.out.println("zipped = " + zipped);
}
public static <T> Gatherer<T, ?, Map.Entry<T, Long>> zipWithIndex() {
return new Gatherer<T, AtomicLong, Map.Entry<T, Long>>() {
@Override
public Supplier<AtomicLong> initializer() {
return () -> new AtomicLong(0);
}
@Override
public Integrator<AtomicLong, T, Map.Entry<T, Long>> integrator() {
return Integrator.of((state, e, ds) -> {
if (!ds.isRejecting()) {
ds.push(Map.entry(e, state.getAndIncrement()));
}
return true;
});
}
};
}
}
record Example() {
public static void main(String[] args) {
Stream.of()
.gather(nothing())
.forEach(System.out::println);
}
public static <T> Gatherer<T, ?, T> nothing() {
return new Gatherer<T, T, T>() {
@Override
public Integrator<T, T, T> integrator() {
return Integrator.of((_, _, ds) -> {
return true;
});
}
@Override
public BiConsumer<T, Downstream<? super T>> finisher() {
return (_, downstream) -> {
downstream.push(null);
};
}
};
}
}
record Example2() {
public static void main(String[] args) {
Stream.of("a", "bb", "ccc", "dd", "cccccc")
.gather(distinctBy(String::length))
.forEach(System.out::println);
}
public static <T, P> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends P> propertyExtractor) {
return new Gatherer<T, HashSet<P>, T>() {
@Override
public Supplier<HashSet<P>> initializer() {
return HashSet::new;
}
@Override
public Integrator<HashSet<P>, T, T> integrator() {
return Integrator.of((state, e, ds) -> {
if (state.add(propertyExtractor.apply(e))) {
ds.push(e);
} else {
// do nothing, element exist
}
return true;
});
}
};
}
}
record Example3() {
public static void main(String[] args) throws InterruptedException {
Thread.ofPlatform().start(() -> {
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1)
.limit(Runtime.getRuntime().availableProcessors())
.parallel()
.map(i -> {
try {
Thread.sleep(Duration.ofDays(2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
})
.toList());
});
Thread.sleep(1000);
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1)
.limit(10)
.parallel()
.map(i -> process(i))
.toList());
System.out.println("result = " + result);
}
public static <T> T process(T input) {
System.out.println("processing " + input + " on " + Thread.currentThread().getName());
try {
Thread.sleep(Duration.ofSeconds(2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input;
}
public static <T> T timed(Supplier<T> action) {
long before = System.currentTimeMillis();
T result = action.get();
long after = System.currentTimeMillis();
System.out.println("duration: " + Duration.ofMillis(after - before));
return result;
}
}
record Example4() {
public static void main(String[] args) throws InterruptedException {
ExecutorService e = Executors.newCachedThreadPool();
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1)
.limit(100)
.collect(ParallelCollectors.parallel(i -> process(i), Collectors.toList(), e, 100))
.join());
System.out.println("result = " + result);
}
public static <T> T process(T input) {
System.out.println("processing " + input + " on " + Thread.currentThread().getName());
try {
Thread.sleep(Duration.ofSeconds(2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input;
}
public static <T> T timed(Supplier<T> action) {
long before = System.currentTimeMillis();
T result = action.get();
long after = System.currentTimeMillis();
System.out.println("duration: " + Duration.ofMillis(after - before));
return result;
}
}
record Example5() {
public static void main(String[] args) throws InterruptedException {
ExecutorService e = Executors.newCachedThreadPool();
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1)
.limit(1000000)
.gather(Gatherers.mapConcurrent(Integer.MAX_VALUE, i -> process(i)))
.toList());
// System.out.println("result = " + result);
}
record Example6() {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4)
.limit(2)
.gather(Gatherers.scan(() -> 0, (i1, i2) -> i1 + i2))
.forEach(System.out::println);
}
}
record Example7() {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4)
.gather(Gatherers.windowSliding(2))
.forEach(System.out::println);
}
}
record Example8() {
public static void main(String[] args) {
}
}
public static <T> T process(T input) {
// System.out.println("processing " + input + " on " + Thread.currentThread().getName());
try {
Thread.sleep(Duration.ofSeconds(2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input;
}
public static <T> T timed(Supplier<T> action) {
long before = System.currentTimeMillis();
T result = action.get();
long after = System.currentTimeMillis();
System.out.println("duration: " + Duration.ofMillis(after - before));
return result;
}
}
public static class ListCollector<T, R> implements Collector<T, ArrayList<T>, List<T>> {
@Override
public Supplier<ArrayList<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<ArrayList<T>, T> accumulator() {
return ArrayList::add;
}
@Override
public BinaryOperator<ArrayList<T>> combiner() {
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<ArrayList<T>, List<T>> finisher() {
return l -> l;
}
@Override
public Set<Characteristics> characteristics() {
return Set.of();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment