Skip to content

Instantly share code, notes, and snippets.

@nioe
Last active August 29, 2015 13:58
Show Gist options
  • Save nioe/10263221 to your computer and use it in GitHub Desktop.
Save nioe/10263221 to your computer and use it in GitHub Desktop.
Generic implementation to wait for Java EE Futures and run callback after one is done
import java.util.concurrent.ExecutionException;
public interface FutureCallback<CallbackType, FutureType> {
public CallbackType work(FutureProvider<FutureType> futureContainer) throws InterruptedException,
ExecutionException;
}
import java.util.concurrent.Future;
public interface FutureProvider<T> {
public Future<T> getFuture();
}
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class FutureWaiter<FutureType> {
private final List<FutureProvider<FutureType>> futures;
private FutureWaiter(List<FutureProvider<FutureType>> futures) {
this.futures = futures;
}
public static <FutureType> FutureWaiter<FutureType> waitFor(List<FutureProvider<FutureType>> futures) {
return new FutureWaiter<FutureType>(futures);
}
/**
* Iterates over the list of FutureProviders as long as there are futures which are not yet done. If the method
* finds a done future it calls the given callback method. All callback results are added to a list which is
* returned at the end.
*
* @param callback
* @return A list of callback results
* @throws InterruptedException
* @throws ExecutionException
*/
public <CallbackType> List<CallbackType> andThen(FutureCallback<CallbackType, FutureType> callback)
throws InterruptedException, ExecutionException {
List<CallbackType> result = new ArrayList<CallbackType>();
Iterator<FutureProvider<FutureType>> it = futures.iterator();
while (it.hasNext()) {
FutureProvider<FutureType> futureContainer = it.next();
if (futureContainer.getFuture().isDone()) {
result.add(callback.work(futureContainer));
it.remove();
}
if (!it.hasNext()) {
it = futures.iterator();
}
}
return result;
}
}
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class FutureWaiterTest {
@Test
public void testWaitForAndDo() throws InterruptedException, ExecutionException {
List<String> expectedResults = Arrays.asList("test1", "test2", "test3");
List<FutureProvider<String>> futures = new ArrayList<FutureProvider<String>>();
for (String expectedResult : expectedResults) {
futures.add(new TestFutureProvider<String>(expectedResult));
}
FutureCallback<String, String> returnString = new TestFutureCallback<String>();
List<String> result = FutureWaiter.waitFor(futures).andThen(returnString);
assertTrue(result.containsAll(expectedResults));
}
private static class TestFutureProvider<T> implements FutureProvider<T> {
private final T futureResult;
private boolean alreadyCalled = false;
public TestFutureProvider(T futureResult) {
this.futureResult = futureResult;
}
@Override
public Future<T> getFuture() {
@SuppressWarnings("unchecked")
Future<T> future = mock(Future.class);
when(future.isDone()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
final boolean ret = alreadyCalled;
alreadyCalled = true;
return ret;
}
});
try {
when(future.get()).thenReturn(futureResult);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return future;
}
}
private static class TestFutureCallback<T> implements FutureCallback<T, T> {
@Override
public T work(FutureProvider<T> futureContainer) throws InterruptedException,
ExecutionException {
return futureContainer.getFuture().get();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment