Last active
October 23, 2024 04:29
-
-
Save pivovarit/f417861cc543e555145cf35a560d80f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pivovarit.gatherers; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.Set; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.BiConsumer; | |
import java.util.function.BiFunction; | |
import java.util.function.BinaryOperator; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import java.util.stream.Collector; | |
import java.util.stream.Collectors; | |
import java.util.stream.Gatherer; | |
import java.util.stream.Gatherers; | |
import java.util.stream.Stream; | |
/** | |
* JEP 461: Breathing New Life into Java Stream API | |
* {@code s/461/485/g | |
* <p> | |
* | |
* @author pivovarit | |
* @site 4comprehension.com | |
* @twitter/x @pivovarit | |
* @github @pivovarit | |
*/ | |
class JDD { | |
// https://github.com/pivovarit/more-gatherers | |
// https://github.com/pivovarit/parallel-collectors | |
public static void main(String[] args) { | |
} | |
record E1() { | |
public static void main(String[] args) { | |
Stream.of(1, 2, 3) | |
.map(e -> { | |
System.out.println(e); | |
return e; | |
}) | |
.toList(); | |
} | |
} | |
record E2() { | |
public static void main(String[] args) { | |
Stream.of(1, 2, 3) | |
.collect(Collectors.toCollection(() -> new ArrayList<>())); | |
} | |
} | |
record E3() { | |
public static void main(String[] args) { | |
Stream.iterate(0, i -> i + 1).limit(20) | |
.gather(Gatherers.windowFixed(3)) | |
.forEach(System.out::println); | |
} | |
public static <T> Collector<T, ArrayList<T>, List<T>> customToList() { | |
return null; | |
} | |
public static class CustomToListCollector<T> implements Collector<T, ArrayList<T>, List<T>> { | |
@Override | |
public Supplier<ArrayList<T>> supplier() { | |
return () -> new ArrayList<>(); | |
} | |
@Override | |
public BiConsumer<ArrayList<T>, T> accumulator() { | |
return (acc, t) -> acc.add(t); | |
} | |
@Override | |
public BinaryOperator<ArrayList<T>> combiner() { | |
return (a1, a2) -> { | |
a1.addAll(a2); | |
return a1; | |
}; | |
} | |
@Override | |
public Function<ArrayList<T>, List<T>> finisher() { | |
return i -> new ArrayList<>(i); | |
} | |
@Override | |
public Set<Characteristics> characteristics() { | |
return Set.of(); | |
} | |
} | |
} | |
record E4() { | |
public static void main(String[] args) { | |
Stream.iterate(0, i -> i + 1).limit(20) | |
.gather(Gatherers.windowSliding(3)) | |
.forEach(System.out::println); | |
} | |
} | |
record E5() { | |
public static void main(String[] args) { | |
Stream.iterate(0, i -> i + 1).limit(3) | |
.gather(Gatherers.scan(() -> 0, Integer::sum)) | |
.forEach(System.out::println); | |
} | |
} | |
record E6() { | |
public static void main(String[] args) throws InterruptedException { | |
Thread.ofPlatform() | |
.start(() -> { | |
Stream.iterate(0, i -> i + 1).limit(20) | |
.parallel() | |
.map(i -> { | |
try { | |
Thread.sleep(Integer.MAX_VALUE); | |
return i; | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
}) | |
.forEach(System.out::println); | |
}); | |
Thread.sleep(1000); | |
timed(() -> { | |
List<Integer> result = Stream.of(1, 2, 3, 4) | |
.parallel() | |
.map(i -> process(i)) | |
.toList(); | |
}); | |
} | |
} | |
record E7() { | |
public static void main(String[] args) { | |
timed(() -> { | |
List<Integer> result = Stream.of(1, 2, 3, 4) | |
.gather(Gatherers.mapConcurrent(4, i -> process(i))) | |
.toList(); | |
}); | |
} | |
} | |
record E8() { | |
public static void main(String[] args) { | |
Stream.of("a", "bb", "ddd") | |
.gather(zipWithIndex()) | |
.forEach(System.out::println); | |
} | |
public static <T> Gatherer<T, ?, Map.Entry<T, Long>> zipWithIndex() { | |
return Gatherer.ofSequential(() -> new AtomicLong(), | |
Gatherer.Integrator.ofGreedy((state, element, downstream) -> { | |
downstream.push(Map.entry(element, state.getAndIncrement())); | |
return true; | |
})); | |
} | |
public static <T1, T2> Gatherer<T1, ?, Map.Entry<T1, T2>> zipWith(Iterator<T2> iterator) { | |
return Gatherer.ofSequential(() -> iterator, | |
Gatherer.Integrator.of((state, element, downstream) -> { | |
if (state.hasNext()) { | |
downstream.push(Map.entry(element, state.next())); | |
return true; | |
} else { | |
return false; | |
} | |
})); | |
} | |
public static <T1, T2, R> Gatherer<T1, ?, R> zipWith(Iterator<T2> iterator, BiFunction<T1, T2, R> function) { | |
return Gatherer.ofSequential(() -> iterator, | |
Gatherer.Integrator.of((state, element, downstream) -> { | |
if (state.hasNext()) { | |
downstream.push(function.apply(element, state.next())); | |
return true; | |
} else { | |
return false; | |
} | |
})); | |
} | |
record E9() { | |
public static void main(String[] args) { | |
List<String> strings = List.of("a", "b", "c"); | |
Stream.of(1, 2, 3, 4) | |
.gather(zipWith(strings.iterator())) | |
.forEach(System.out::println); | |
} | |
} | |
record E10() { | |
public static void main(String[] args) { | |
List<String> strings = List.of("a", "b", "c"); | |
Stream.of(1, 2, 3, 4) | |
.gather(zipWith(strings.iterator(), (i, s) -> i + s)) | |
.forEach(System.out::println); | |
} | |
} | |
record E11() { | |
public static void main(String[] args) { | |
Stream.of("a", "bb", "bb", "ccc", "dd", "ee") | |
.gather(distinctBy(String::length)) | |
.forEach(System.out::println); | |
} | |
public static <T, P> Gatherer<T, ?, T> distinctBy(Function<T, P> keyExtractor) { | |
Objects.requireNonNull(keyExtractor, "keyExtractor can't be null"); | |
return Gatherer.ofSequential(() -> new HashSet<P>(), Gatherer.Integrator.ofGreedy((state, element, downstream) -> { | |
if (state.add(keyExtractor.apply(element))) { | |
return downstream.push(element); | |
} | |
return true; | |
})); | |
} | |
record E9() { | |
public static void main(String[] args) { | |
Stream.of(1, 2, 3, 4) | |
.gather(last(2)) | |
.forEach(System.out::println); | |
} | |
public static <T> Gatherer<T, ?, T> last(int size) { | |
return Gatherer.ofSequential(() -> new ArrayList<T>(), Gatherer.Integrator.ofGreedy( | |
(state, element, _) -> { | |
state.add(element); | |
if (state.size() > size) { | |
state.removeFirst(); | |
} | |
return true; | |
}), (state, downstream) -> { | |
if (!downstream.isRejecting()) { | |
state.forEach(downstream::push); | |
} | |
}); | |
} | |
} | |
} | |
} | |
public static void timed(Runnable runnable) { | |
long start = System.currentTimeMillis(); | |
runnable.run(); | |
System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms"); | |
} | |
public static <T> T process(T input) { | |
try { | |
System.out.println("Processing " + input + " on " + Thread.currentThread().getName()); | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return input; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment