Skip to content

Instantly share code, notes, and snippets.

@lpenaud
Last active December 4, 2024 14:18
Show Gist options
  • Save lpenaud/71355829a7b6cd4df84afc4fcfab26ea to your computer and use it in GitHub Desktop.
Save lpenaud/71355829a7b6cd4df84afc4fcfab26ea to your computer and use it in GitHub Desktop.
AllOfFuturesHelpers

All of futures

Create futures from list and function.

Usage

final var list = List.of('A', 'B', 'C');
AllOfFutureHelpers.<> builder()
          .function(e -> someIOFunction(e))
          // Optional by default ForkJoinPool.commonPool()
          .executor(this.myExecutor)
          .build()
          .addAll(list)
          .join()
          .toList();
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.Builder;
// ThrowableFunction -> R (T t) throws E;
public class AllOfFuturesHelpers<T, R> {
private final Function<T, CompletableFuture<R>> asyncFunction;
private final Stream.Builder<CompletableFuture<R>> builder;
private AllOfFutureHelpers(final Function<T, CompletableFuture<R>> asyncFunction) {
this.asyncFunction = Objects.requireNonNull(asyncFunction);
this.builder = Stream.builder();
}
public static <T, R> AllOfFutureHelpers<T, R> fromAsyncFunction(final Function<T, CompletableFuture<R>> asyncFunction) {
return new AllOfFutureHelpers<>(asyncFunction);
}
@Builder
private static <T, R> AllOfFutureHelpers<T, R> fromFunction(final Function<T, R> function, final ThrowableFunction<T, R, ?> throwableFunction, final Executor executor) {
final var asyncFunction = function != null ? new AsyncFunction<>(function, executor)
: new AsyncFunction<>(throwableFunction, executor);
return new AllOfFutureHelpers<>(asyncFunction);
}
public AllOfFutureHelpers<T, R> add(final T element) {
this.builder.add(this.asyncFunction.apply(element));
return this;
}
public AllOfFutureHelpers<T, R> addAll(final Collection<T> elements) {
for (final var e : elements) {
this.builder.add(this.asyncFunction.apply(e));
}
return this;
}
public AllOfFutureHelpers<T, R> addAll(@SuppressWarnings("unchecked") final T... elements) {
for (final var e : elements) {
this.builder.add(this.asyncFunction.apply(e));
}
return this;
}
public CompletableFuture<Stream<R>> toCompletableFuture() {
final var resultBuilder = CompletableFuture.completedFuture(Stream.<R> builder());
final BiFunction<Stream.Builder<R>, R, Stream.Builder<R>> add = Stream.Builder::add;
return this.builder.build()
.reduce(resultBuilder, (b, f) -> b.thenCombine(f, add), (a, b) -> a)
.thenApply(Stream.Builder::build);
}
public Stream<R> join() {
return this.toCompletableFuture().join();
}
}
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
class AsyncFunction<T, R> implements Function<T, CompletableFuture<R>> {
private final Function<T, R> function;
private final Executor executor;
public AsyncFunction(final Function<T, R> function, final Executor executor) {
this.function = Objects.requireNonNull(function);
this.executor = AsyncFunction.defaultExecutor(executor);
}
public AsyncFunction(final ThrowableFunction<T, R, ?> throwableFunction, final Executor executor) {
// Implicit null check
this.function = throwableFunction.toFunction();
this.executor = AsyncFunction.defaultExecutor(executor);
}
private static Executor defaultExecutor(final Executor executor) {
return executor == null ? ForkJoinPool.commonPool() : executor;
}
@Override
public CompletableFuture<R> apply(final T t) {
return CompletableFuture.supplyAsync(() -> this.function.apply(t), this.executor);
}
}
public class RuntimeWrapperException extends RuntimeException {
private static final long serialVersionUID = 853467948L;
public RuntimeWrapperException(final String message) {
super(message);
}
public RuntimeWrapperException(final Throwable cause) {
super(cause);
}
public RuntimeWrapperException(final String message, final Throwable cause) {
super(message, cause);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment