Skip to content

Instantly share code, notes, and snippets.

@pivovarit
Last active October 23, 2024 04:29
Show Gist options
  • Save pivovarit/f417861cc543e555145cf35a560d80f0 to your computer and use it in GitHub Desktop.
Save pivovarit/f417861cc543e555145cf35a560d80f0 to your computer and use it in GitHub Desktop.
package com.pivovarit.gatherers;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
/**
* JEP 461: Breathing New Life into Java Stream API
* {@code s/461/485/g
* <p>
*
* @author pivovarit
* @site 4comprehension.com
* @twitter/x @pivovarit
* @github @pivovarit
*/
class JDD {
// https://github.com/pivovarit/more-gatherers
// https://github.com/pivovarit/parallel-collectors
public static void main(String[] args) {
}
record E1() {
public static void main(String[] args) {
Stream.of(1, 2, 3)
.map(e -> {
System.out.println(e);
return e;
})
.toList();
}
}
record E2() {
public static void main(String[] args) {
Stream.of(1, 2, 3)
.collect(Collectors.toCollection(() -> new ArrayList<>()));
}
}
record E3() {
public static void main(String[] args) {
Stream.iterate(0, i -> i + 1).limit(20)
.gather(Gatherers.windowFixed(3))
.forEach(System.out::println);
}
public static <T> Collector<T, ArrayList<T>, List<T>> customToList() {
return null;
}
public static class CustomToListCollector<T> implements Collector<T, ArrayList<T>, List<T>> {
@Override
public Supplier<ArrayList<T>> supplier() {
return () -> new ArrayList<>();
}
@Override
public BiConsumer<ArrayList<T>, T> accumulator() {
return (acc, t) -> acc.add(t);
}
@Override
public BinaryOperator<ArrayList<T>> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<ArrayList<T>, List<T>> finisher() {
return i -> new ArrayList<>(i);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of();
}
}
}
record E4() {
public static void main(String[] args) {
Stream.iterate(0, i -> i + 1).limit(20)
.gather(Gatherers.windowSliding(3))
.forEach(System.out::println);
}
}
record E5() {
public static void main(String[] args) {
Stream.iterate(0, i -> i + 1).limit(3)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.forEach(System.out::println);
}
}
record E6() {
public static void main(String[] args) throws InterruptedException {
Thread.ofPlatform()
.start(() -> {
Stream.iterate(0, i -> i + 1).limit(20)
.parallel()
.map(i -> {
try {
Thread.sleep(Integer.MAX_VALUE);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.forEach(System.out::println);
});
Thread.sleep(1000);
timed(() -> {
List<Integer> result = Stream.of(1, 2, 3, 4)
.parallel()
.map(i -> process(i))
.toList();
});
}
}
record E7() {
public static void main(String[] args) {
timed(() -> {
List<Integer> result = Stream.of(1, 2, 3, 4)
.gather(Gatherers.mapConcurrent(4, i -> process(i)))
.toList();
});
}
}
record E8() {
public static void main(String[] args) {
Stream.of("a", "bb", "ddd")
.gather(zipWithIndex())
.forEach(System.out::println);
}
public static <T> Gatherer<T, ?, Map.Entry<T, Long>> zipWithIndex() {
return Gatherer.ofSequential(() -> new AtomicLong(),
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
downstream.push(Map.entry(element, state.getAndIncrement()));
return true;
}));
}
public static <T1, T2> Gatherer<T1, ?, Map.Entry<T1, T2>> zipWith(Iterator<T2> iterator) {
return Gatherer.ofSequential(() -> iterator,
Gatherer.Integrator.of((state, element, downstream) -> {
if (state.hasNext()) {
downstream.push(Map.entry(element, state.next()));
return true;
} else {
return false;
}
}));
}
public static <T1, T2, R> Gatherer<T1, ?, R> zipWith(Iterator<T2> iterator, BiFunction<T1, T2, R> function) {
return Gatherer.ofSequential(() -> iterator,
Gatherer.Integrator.of((state, element, downstream) -> {
if (state.hasNext()) {
downstream.push(function.apply(element, state.next()));
return true;
} else {
return false;
}
}));
}
record E9() {
public static void main(String[] args) {
List<String> strings = List.of("a", "b", "c");
Stream.of(1, 2, 3, 4)
.gather(zipWith(strings.iterator()))
.forEach(System.out::println);
}
}
record E10() {
public static void main(String[] args) {
List<String> strings = List.of("a", "b", "c");
Stream.of(1, 2, 3, 4)
.gather(zipWith(strings.iterator(), (i, s) -> i + s))
.forEach(System.out::println);
}
}
record E11() {
public static void main(String[] args) {
Stream.of("a", "bb", "bb", "ccc", "dd", "ee")
.gather(distinctBy(String::length))
.forEach(System.out::println);
}
public static <T, P> Gatherer<T, ?, T> distinctBy(Function<T, P> keyExtractor) {
Objects.requireNonNull(keyExtractor, "keyExtractor can't be null");
return Gatherer.ofSequential(() -> new HashSet<P>(), Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (state.add(keyExtractor.apply(element))) {
return downstream.push(element);
}
return true;
}));
}
record E9() {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4)
.gather(last(2))
.forEach(System.out::println);
}
public static <T> Gatherer<T, ?, T> last(int size) {
return Gatherer.ofSequential(() -> new ArrayList<T>(), Gatherer.Integrator.ofGreedy(
(state, element, _) -> {
state.add(element);
if (state.size() > size) {
state.removeFirst();
}
return true;
}), (state, downstream) -> {
if (!downstream.isRejecting()) {
state.forEach(downstream::push);
}
});
}
}
}
}
public static void timed(Runnable runnable) {
long start = System.currentTimeMillis();
runnable.run();
System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms");
}
public static <T> T process(T input) {
try {
System.out.println("Processing " + input + " on " + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment