Skip to content

Instantly share code, notes, and snippets.

@dhet
Last active January 15, 2024 17:34
Show Gist options
  • Save dhet/d5f24af7a586dbf7a6f630fc44bb9265 to your computer and use it in GitHub Desktop.
Save dhet/d5f24af7a586dbf7a6f630fc44bb9265 to your computer and use it in GitHub Desktop.
Wait for the completion of multiple Java Futures or time out

A Stream Collector in plain Java that blocks until all Futures of a Stream are completed or reach a timeout.

Usage

Simply create a LenientFutureCollector with a timeout and pass it to Stream#collect(Collector):

Stream.of(future1, future2, future3)
  .collect(new LenientFutureCollector(Duration.ofSeconds(10)));

Example

Executor executor = Executors.newCachedThreadPool();

List<String> results = Stream.of("1", "2", "3", "4", "5")
  .map(in -> executor.submit(expensiveOperation(in))
  .collect(new LenientFutureCollector(Duration.ofSeconds(1)));

// Thread is blocked until all futures are completed or the timeout is reached.

results.stream().forEach(System.out::println);

Output:

null
2
3
null
5

Note that the Collector only collects successful results (hence, "lenient"). Errors, e.g. timeouts and exceptions during execution, are ignored and mapped to null by default. See Future 1 and 4 in the example above. To change this behavior, an optional error mapper can be specified with the second constructor argument:

Function<Throwable, String> transformAndLog = t -> {
  log.error("Something went wrong.", t);
  return "ERROR";
};
new LenientFutureCollector(timeout, transformAndLog);

Be aware that:

  • The order of the Stream is preserved.
  • The Collector swallows InterruptedExceptions so you might want to check the interrupt flag after the Stream is closed.
  • Due to the lazy nature of Streams, the waiting happens when the Stream is completed (i.e. the last element of the Stream is reached). If the Stream is unbounded the collector will never trigger.
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.*;
import java.util.stream.Collector;
public class LenientFutureCollector<T> implements Collector<Future<T>, List<Future<T>>, List<T>> {
private final Instant deadline;
private final Function<Throwable, T> errorMapper;
public LenientFutureCollector(Duration timeout) {
this(timeout, t -> null);
}
public LenientFutureCollector(Duration timeout, Function<Throwable, T> errorMapper) {
this.deadline = Instant.now().plus(timeout);
this.errorMapper = errorMapper;
}
@Override
public Supplier<List<Future<T>>> supplier() {
return LinkedList::new;
}
@Override
public BiConsumer<List<Future<T>>, Future<T>> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<Future<T>>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
@Override
public Function<List<Future<T>>, List<T>> finisher() {
return futures -> {
List<T> results = new ArrayList<>(futures.size());
boolean interrupted = false;
for (Future<T> future : futures) {
try {
if (future.isDone()) {
T result = future.get();
results.add(result);
} else if (interrupted) {
future.cancel(true);
results.add(errorMapper.apply(new InterruptedException()));
} else if (Instant.now().isAfter(deadline)) {
future.cancel(true);
results.add(errorMapper.apply(new TimeoutException()));
} else {
Duration timeLeft = Duration.between(Instant.now(), deadline);
T result = future.get(timeLeft.toNanos(), TimeUnit.NANOSECONDS);
results.add(result);
}
} catch (InterruptedException e) {
interrupted = true;
Thread.currentThread().interrupt();
results.add(errorMapper.apply(e));
} catch (ExecutionException e) {
results.add(errorMapper.apply(e.getCause()));
} catch (TimeoutException e) {
future.cancel(true);
results.add(errorMapper.apply(e));
} catch (Exception e) {
results.add(errorMapper.apply(e));
}
}
return results;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment