Created
March 27, 2025 18:44
-
-
Save pivovarit/d01258ff6c06e4aa790e54d8ca5878ec to your computer and use it in GitHub Desktop.
merge_it_27_03_2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package it.merge; | |
import com.pivovarit.collectors.ParallelCollectors; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.BiConsumer; | |
import java.util.function.BinaryOperator; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import java.util.stream.Collector; | |
import java.util.stream.Collectors; | |
import java.util.stream.Gatherer; | |
import java.util.stream.Gatherers; | |
import java.util.stream.Stream; | |
/** | |
* @author Grzegorz Piwowarek | |
* WJUG/Vavr | |
* Oracle ACE | |
*/ | |
class Main { | |
public static void main(String[] args) { | |
Stream.iterate(0, i -> i + 1).limit(20) | |
.gather(Gatherer.of(Gatherer.Integrator.of((state, e, ds) -> { | |
ds.push(e); | |
return ThreadLocalRandom.current().nextBoolean(); | |
}))) | |
.forEach(System.out::println); | |
} | |
record Example1() { | |
public static void main(String[] args) { | |
List<Map.Entry<String, Long>> zipped = Stream.of("a", "b", "c", "d") | |
.gather(zipWithIndex()) | |
.toList(); | |
System.out.println("zipped = " + zipped); | |
} | |
public static <T> Gatherer<T, ?, Map.Entry<T, Long>> zipWithIndex() { | |
return new Gatherer<T, AtomicLong, Map.Entry<T, Long>>() { | |
@Override | |
public Supplier<AtomicLong> initializer() { | |
return () -> new AtomicLong(0); | |
} | |
@Override | |
public Integrator<AtomicLong, T, Map.Entry<T, Long>> integrator() { | |
return Integrator.of((state, e, ds) -> { | |
if (!ds.isRejecting()) { | |
ds.push(Map.entry(e, state.getAndIncrement())); | |
} | |
return true; | |
}); | |
} | |
}; | |
} | |
} | |
record Example() { | |
public static void main(String[] args) { | |
Stream.of() | |
.gather(nothing()) | |
.forEach(System.out::println); | |
} | |
public static <T> Gatherer<T, ?, T> nothing() { | |
return new Gatherer<T, T, T>() { | |
@Override | |
public Integrator<T, T, T> integrator() { | |
return Integrator.of((_, _, ds) -> { | |
return true; | |
}); | |
} | |
@Override | |
public BiConsumer<T, Downstream<? super T>> finisher() { | |
return (_, downstream) -> { | |
downstream.push(null); | |
}; | |
} | |
}; | |
} | |
} | |
record Example2() { | |
public static void main(String[] args) { | |
Stream.of("a", "bb", "ccc", "dd", "cccccc") | |
.gather(distinctBy(String::length)) | |
.forEach(System.out::println); | |
} | |
public static <T, P> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends P> propertyExtractor) { | |
return new Gatherer<T, HashSet<P>, T>() { | |
@Override | |
public Supplier<HashSet<P>> initializer() { | |
return HashSet::new; | |
} | |
@Override | |
public Integrator<HashSet<P>, T, T> integrator() { | |
return Integrator.of((state, e, ds) -> { | |
if (state.add(propertyExtractor.apply(e))) { | |
ds.push(e); | |
} else { | |
// do nothing, element exist | |
} | |
return true; | |
}); | |
} | |
}; | |
} | |
} | |
record Example3() { | |
public static void main(String[] args) throws InterruptedException { | |
Thread.ofPlatform().start(() -> { | |
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1) | |
.limit(Runtime.getRuntime().availableProcessors()) | |
.parallel() | |
.map(i -> { | |
try { | |
Thread.sleep(Duration.ofDays(2)); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return i; | |
}) | |
.toList()); | |
}); | |
Thread.sleep(1000); | |
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1) | |
.limit(10) | |
.parallel() | |
.map(i -> process(i)) | |
.toList()); | |
System.out.println("result = " + result); | |
} | |
public static <T> T process(T input) { | |
System.out.println("processing " + input + " on " + Thread.currentThread().getName()); | |
try { | |
Thread.sleep(Duration.ofSeconds(2)); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return input; | |
} | |
public static <T> T timed(Supplier<T> action) { | |
long before = System.currentTimeMillis(); | |
T result = action.get(); | |
long after = System.currentTimeMillis(); | |
System.out.println("duration: " + Duration.ofMillis(after - before)); | |
return result; | |
} | |
} | |
record Example4() { | |
public static void main(String[] args) throws InterruptedException { | |
ExecutorService e = Executors.newCachedThreadPool(); | |
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1) | |
.limit(100) | |
.collect(ParallelCollectors.parallel(i -> process(i), Collectors.toList(), e, 100)) | |
.join()); | |
System.out.println("result = " + result); | |
} | |
public static <T> T process(T input) { | |
System.out.println("processing " + input + " on " + Thread.currentThread().getName()); | |
try { | |
Thread.sleep(Duration.ofSeconds(2)); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return input; | |
} | |
public static <T> T timed(Supplier<T> action) { | |
long before = System.currentTimeMillis(); | |
T result = action.get(); | |
long after = System.currentTimeMillis(); | |
System.out.println("duration: " + Duration.ofMillis(after - before)); | |
return result; | |
} | |
} | |
record Example5() { | |
public static void main(String[] args) throws InterruptedException { | |
ExecutorService e = Executors.newCachedThreadPool(); | |
List<Integer> result = timed(() -> Stream.iterate(0, i -> i + 1) | |
.limit(1000000) | |
.gather(Gatherers.mapConcurrent(Integer.MAX_VALUE, i -> process(i))) | |
.toList()); | |
// System.out.println("result = " + result); | |
} | |
record Example6() { | |
public static void main(String[] args) { | |
Stream.of(1, 2, 3, 4) | |
.limit(2) | |
.gather(Gatherers.scan(() -> 0, (i1, i2) -> i1 + i2)) | |
.forEach(System.out::println); | |
} | |
} | |
record Example7() { | |
public static void main(String[] args) { | |
Stream.of(1, 2, 3, 4) | |
.gather(Gatherers.windowSliding(2)) | |
.forEach(System.out::println); | |
} | |
} | |
record Example8() { | |
public static void main(String[] args) { | |
} | |
} | |
public static <T> T process(T input) { | |
// System.out.println("processing " + input + " on " + Thread.currentThread().getName()); | |
try { | |
Thread.sleep(Duration.ofSeconds(2)); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return input; | |
} | |
public static <T> T timed(Supplier<T> action) { | |
long before = System.currentTimeMillis(); | |
T result = action.get(); | |
long after = System.currentTimeMillis(); | |
System.out.println("duration: " + Duration.ofMillis(after - before)); | |
return result; | |
} | |
} | |
public static class ListCollector<T, R> implements Collector<T, ArrayList<T>, List<T>> { | |
@Override | |
public Supplier<ArrayList<T>> supplier() { | |
return ArrayList::new; | |
} | |
@Override | |
public BiConsumer<ArrayList<T>, T> accumulator() { | |
return ArrayList::add; | |
} | |
@Override | |
public BinaryOperator<ArrayList<T>> combiner() { | |
return (l1, l2) -> { | |
l1.addAll(l2); | |
return l1; | |
}; | |
} | |
@Override | |
public Function<ArrayList<T>, List<T>> finisher() { | |
return l -> l; | |
} | |
@Override | |
public Set<Characteristics> characteristics() { | |
return Set.of(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment