|
import java.util.Collection; |
|
import java.util.Objects; |
|
import java.util.concurrent.CompletableFuture; |
|
import java.util.concurrent.Executor; |
|
import java.util.function.BiFunction; |
|
import java.util.function.Function; |
|
import java.util.stream.Stream; |
|
|
|
import lombok.Builder; |
|
|
|
// ThrowableFunction -> R (T t) throws E; |
|
|
|
public class AllOfFuturesHelpers<T, R> { |
|
|
|
private final Function<T, CompletableFuture<R>> asyncFunction; |
|
|
|
private final Stream.Builder<CompletableFuture<R>> builder; |
|
|
|
private AllOfFutureHelpers(final Function<T, CompletableFuture<R>> asyncFunction) { |
|
this.asyncFunction = Objects.requireNonNull(asyncFunction); |
|
this.builder = Stream.builder(); |
|
} |
|
|
|
public static <T, R> AllOfFutureHelpers<T, R> fromAsyncFunction(final Function<T, CompletableFuture<R>> asyncFunction) { |
|
return new AllOfFutureHelpers<>(asyncFunction); |
|
} |
|
|
|
@Builder |
|
private static <T, R> AllOfFutureHelpers<T, R> fromFunction(final Function<T, R> function, final ThrowableFunction<T, R, ?> throwableFunction, final Executor executor) { |
|
final var asyncFunction = function != null ? new AsyncFunction<>(function, executor) |
|
: new AsyncFunction<>(throwableFunction, executor); |
|
return new AllOfFutureHelpers<>(asyncFunction); |
|
} |
|
|
|
public AllOfFutureHelpers<T, R> add(final T element) { |
|
this.builder.add(this.asyncFunction.apply(element)); |
|
return this; |
|
} |
|
|
|
public AllOfFutureHelpers<T, R> addAll(final Collection<T> elements) { |
|
for (final var e : elements) { |
|
this.builder.add(this.asyncFunction.apply(e)); |
|
} |
|
return this; |
|
} |
|
|
|
public AllOfFutureHelpers<T, R> addAll(@SuppressWarnings("unchecked") final T... elements) { |
|
for (final var e : elements) { |
|
this.builder.add(this.asyncFunction.apply(e)); |
|
} |
|
return this; |
|
} |
|
|
|
public CompletableFuture<Stream<R>> toCompletableFuture() { |
|
final var resultBuilder = CompletableFuture.completedFuture(Stream.<R> builder()); |
|
final BiFunction<Stream.Builder<R>, R, Stream.Builder<R>> add = Stream.Builder::add; |
|
return this.builder.build() |
|
.reduce(resultBuilder, (b, f) -> b.thenCombine(f, add), (a, b) -> a) |
|
.thenApply(Stream.Builder::build); |
|
|
|
} |
|
|
|
public Stream<R> join() { |
|
return this.toCompletableFuture().join(); |
|
} |
|
} |