Last active
August 29, 2015 14:21
-
-
Save purijatin/d5e05d84941c3c408282 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) { | |
if(com.isEmpty()){ | |
throw new IllegalArgumentException(); | |
} | |
Stream<? extends CompletableFuture<T>> stream = com.stream(); | |
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>()); | |
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> { | |
x.add(y); | |
return x; | |
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> { | |
ls1.addAll(ls2); | |
return ls1; | |
},exec)); | |
} | |
ExecutorService executorService = Executors.newCachedThreadPool(); | |
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> { | |
try { | |
Thread.sleep((long) (Math.random() * 10)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
return x; | |
}, executorService)); | |
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService); | |
/** | |
* Transforms a {@code List<CompletableFuture<T>>} into a {@code CompletableFuture<List<T>>}. | |
* Useful for reducing many Futures into a single Future. | |
* <p> | |
* If any of the future fails, it returns a failed future with the exception as that of the failed one. | |
* </p> | |
* <p> | |
* The order of the returned list (when returned future is completed) is undisturbed. | |
* i.e. the same as its future counterpart in the argument list. | |
* </p> | |
* @param com a list of completable future's | |
* @param <T> The type of list | |
* @return future containing the list of results. Else failed future with the exception | |
*/ | |
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { | |
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()])) | |
.thenApply(v -> com.stream() | |
.map(CompletableFuture::join) | |
.collect(toList()) | |
); | |
} | |
/** | |
* See {@link Util#sequence(List)} for more info | |
* @see Util#sequence(List) | |
*/ | |
@SafeVarargs | |
public static <T> CompletableFuture<List<T>> sequence(CompletableFuture<T>... com) { | |
return sequence(Arrays.asList(com)); | |
} | |
public class UtilTest { | |
private ExecutorService exec = Executors.newCachedThreadPool(); | |
private ScheduledExecutorService sched = Executors.newScheduledThreadPool(200); | |
@Test | |
public void testSequence1() throws Exception { | |
//test normal usage | |
//single task quickly finish | |
List<CompletableFuture<Integer>> in1 = rangeFuture(1, 2, 0); | |
CompletableFuture<List<Integer>> ls1 = Util.sequence(in1); | |
Assert.assertEquals(range(1, 2), ls1.get(2, SECONDS)); | |
//single task async finish | |
List<CompletableFuture<Integer>> in2 = rangeFuture(1, 2, 500); | |
CompletableFuture<List<Integer>> ls2 = Util.sequence(in2); | |
Assert.assertEquals(range(1, 2), ls2.get(2, SECONDS)); | |
//all over quickly and order is maintained | |
List<CompletableFuture<Integer>> in3 = rangeFuture(1, 70000, 0); | |
CompletableFuture<List<Integer>> ls3 = Util.sequence(in3); | |
Assert.assertEquals(range(1, 70000), ls3.get(2, SECONDS)); | |
//all over async and order is maintained | |
List<CompletableFuture<Integer>> in4 = rangeFuture(1, 70000, 500); | |
CompletableFuture<List<Integer>> ls4 = Util.sequence(in4); | |
Assert.assertEquals(range(1, 70000), ls4.get(20, SECONDS)); | |
} | |
@Test | |
public void testSequence2() throws Exception { | |
//test if any fail | |
//a test fails | |
List<CompletableFuture<Integer>> in = asList(failure(1000)); | |
CompletableFuture<List<Integer>> ls = Util.sequence(in); | |
while(!ls.isDone()); | |
Assert.assertTrue(ls.isCompletedExceptionally()); | |
//no test fails. | |
List<CompletableFuture<Integer>> in1 = mixed(100, 1000, 0); | |
CompletableFuture<List<Integer>> ls1 = Util.sequence(in1); | |
while(!ls1.isDone()); | |
Assert.assertFalse("Should not end exceptionally", ls1.isCompletedExceptionally()); | |
List<CompletableFuture<Integer>> in2 = mixed(50000, 500, 0.25); | |
CompletableFuture<List<Integer>> ls2 = Util.sequence(in2); | |
while(!ls2.isDone()); | |
Assert.assertTrue("Should end exceptionally", ls2.isCompletedExceptionally()); | |
} | |
// @Test(expected = RuntimeException.class, timeout = 10000) | |
public void testSequence3() throws Exception{ | |
//as soon as any test fails, it shouldnt wait for others to terminate and give result asap | |
CompletableFuture<Integer> success = new CompletableFuture<>(); | |
CompletableFuture<Integer> fail = new CompletableFuture<>(); | |
CompletableFuture<List<Integer>> out = Util.sequence(asList(success(1, 0), fail, success)); | |
fail.completeExceptionally(new RuntimeException()); | |
out.get(); | |
} | |
private final Random random = new Random(); | |
private <T> CompletableFuture<T> success(T value, int maxSleep) { | |
if (maxSleep == 0) { | |
return CompletableFuture.completedFuture(value); | |
} | |
CompletableFuture<T> c = new CompletableFuture<T>(); | |
sched.schedule(() -> c.complete(value), random.nextInt(maxSleep), MILLISECONDS); | |
return c; | |
} | |
private <T> CompletableFuture<T> failure(Exception ex, int maxSleep) { | |
CompletableFuture<T> com = new CompletableFuture<>(); | |
if (maxSleep == 0) { | |
com.completeExceptionally(ex); | |
} else { | |
CompletableFuture<T> c = new CompletableFuture<T>(); | |
sched.schedule(() -> com.completeExceptionally(ex), random.nextInt(maxSleep), MILLISECONDS); | |
} | |
return com; | |
} | |
private <T> CompletableFuture<T> failure(int maxSleep) { | |
return failure(new RuntimeException("Auto-Failure. " + System.nanoTime()), maxSleep); | |
} | |
private List<CompletableFuture<Integer>> rangeFuture(int from, int to, int maxSleep) { | |
return IntStream.range(from, to) | |
.mapToObj(x -> success(x, maxSleep)) | |
.collect(toList()); | |
} | |
/** | |
* returns a list of futures. that would fail or succeed | |
* | |
* @param count number of futures | |
* @param maxSleep max time taken for any task to be completed. Time taken will be less than this | |
* @param failfrequency frequency on how many futures should fail. if 0 then all success. if 1 then all fail | |
* @return ls | |
*/ | |
private List<CompletableFuture<Integer>> mixed(int count, int maxSleep, double failfrequency) { | |
if (failfrequency < 0 | failfrequency > 1) | |
throw new IllegalArgumentException("Frequence can only be between 0 and 1"); | |
return IntStream.range(0, count) | |
.mapToObj(x -> { | |
if (random.nextDouble() < failfrequency) | |
return this.<Integer>failure(maxSleep); | |
else return success(1, maxSleep); | |
}) | |
.collect(toList()); | |
} | |
/** | |
* returns a list of integers | |
* | |
* @param from inclusive | |
* @param to exclusive | |
* @return list of integers | |
*/ | |
private List<Integer> range(int from, int to) { | |
return IntStream.range(from, to).boxed().collect(toList()); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment