Skip to content

Instantly share code, notes, and snippets.

@hamnis
Created August 13, 2015 06:23
Show Gist options
  • Save hamnis/19b3eef14d0b79744fdc to your computer and use it in GitHub Desktop.
Save hamnis/19b3eef14d0b79744fdc to your computer and use it in GitHub Desktop.
package net.hamnaberg.future;
import java.util.concurrent.*;
public class FutureConverter {
public static <A> CompletableFuture<A> convert(Future<A> future, ExecutorService executor, boolean shutdown) {
if (future instanceof CompletableFuture)
return (CompletableFuture<A>) future;
final BlockingQueue<Future<A>> queue = new LinkedBlockingQueue<>(1);
queue.offer(future);
final CompletableFuture<A> f = new CompletableFuture<>();
executor.execute(() -> {
Future<A> other;
while((other = queue.poll()) != null) {
if (other.isCancelled()) {
f.cancel(true);
}
else if (!other.isDone()) {
queue.offer(other);
}
else {
try {
f.complete(other.get());
} catch(ExecutionException e) {
f.completeExceptionally(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
});
if (shutdown) {
executor.shutdown();
}
return f;
}
public static <A> CompletableFuture<A> convert(Future<A> f, ExecutorService executor) {
return convert(f, executor, false);
}
public static <A> CompletableFuture<A> convert(Future<A> f) {
return convert(f, Executors.newSingleThreadExecutor(), true);
}
}
package net.hamnaberg.future;
import org.junit.Test;
import java.util.concurrent.*;
import static org.junit.Assert.*;
public class FutureConverterTest {
@Test
public void minimalFutureTest() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> future = exec.submit(sleepAnd(1000, () -> 1000));
ExecutorService single = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = FutureConverter.convert(future, single);
try {
int value = f.get();
assertEquals("meh", 1000, value);
} catch (ExecutionException e) {
fail(e.getCause().getMessage());
} finally {
exec.shutdown();
single.shutdown();
}
}
@Test
public void failWithException() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> future = exec.submit(sleepAnd(1000, () -> {
throw new RuntimeException("Hello");
}));
ExecutorService single = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = FutureConverter.convert(future, single);
try {
f.get();
fail("Some value was here");
} catch (ExecutionException e) {
assertEquals("Hello", e.getCause().getMessage());
} finally {
exec.shutdown();
single.shutdown();
}
}
@Test
public void cancel() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> future = exec.submit(sleepAnd(1000, () -> {
throw new RuntimeException("Hello");
}));
ExecutorService single = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = FutureConverter.convert(future, single);
future.cancel(true);
try {
f.get();
fail("Some value was here");
} catch (ExecutionException e) {
fail("Unexpected exception");
} catch (CancellationException e) {
assertTrue(true);
} finally {
exec.shutdown();
single.shutdown();
}
}
private <A> Callable<A> sleepAnd(long amount, Callable<A> andThen) {
return () -> {
try {
Thread.sleep(amount);
return andThen.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment