Skip to content

Instantly share code, notes, and snippets.

@Yoplitein
Last active August 19, 2021 23:59
Show Gist options
  • Save Yoplitein/8f6a3ff87d73cb436ad679c7e9e17150 to your computer and use it in GitHub Desktop.
Save Yoplitein/8f6a3ff87d73cb436ad679c7e9e17150 to your computer and use it in GitHub Desktop.
/**
Sequentially schedules a series of CompletableFutures, i.e. earlier futures
must complete before subsequent futures are scheduled.
Assumes tasks is a stream which generates futures on demand.
A stream over an existing set of futures will not work as expected,
as they all will have already been scheduled.
*/
static CompletableFuture<Void> chainAsync(Stream<CompletableFuture<?>> tasks, Executor pool)
{
chainAsync(tasks, pool, 1);
}
// ditto, but up to `parallel` futures are scheduled concurrently.
static CompletableFuture<Void> chainAsync(Stream<CompletableFuture<?>> tasks, Executor pool, int parallel)
{
final var done = new CompletableFuture<Void>();
final var empty = CompletableFuture.completedFuture(null);
final var iter = tasks.iterator();
final var nextFutures = new CompletableFuture[parallel];
final var scheduleNext = new Runnable[1]; // work around inability of lambdas to self-reference
scheduleNext[0] = () -> {
try
{
if(!iter.hasNext())
done.complete(null);
else
{
// taking subarray is inefficient, so fill remainder with bogus task
Arrays.fill(nextFutures, empty);
for(int i = 0; i < parallel; i++)
{
if(!iter.hasNext()) break;
nextFutures[i] = iter.next();
}
final var next = CompletableFuture.allOf(nextFutures);
next.thenRunAsync(scheduleNext[0], pool);
next.exceptionallyAsync(err -> { done.completeExceptionally(err); return null; }, pool);
}
}
// handle cases where stream logic throws
catch(Throwable err)
{
done.completeExceptionally(err);
for(var task: nextFutures) task.completeExceptionally(err);
}
};
scheduleNext[0].run();
return done;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment