Created
August 13, 2015 06:23
-
-
Save hamnis/19b3eef14d0b79744fdc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.hamnaberg.future; | |
import java.util.concurrent.*; | |
public class FutureConverter { | |
public static <A> CompletableFuture<A> convert(Future<A> future, ExecutorService executor, boolean shutdown) { | |
if (future instanceof CompletableFuture) | |
return (CompletableFuture<A>) future; | |
final BlockingQueue<Future<A>> queue = new LinkedBlockingQueue<>(1); | |
queue.offer(future); | |
final CompletableFuture<A> f = new CompletableFuture<>(); | |
executor.execute(() -> { | |
Future<A> other; | |
while((other = queue.poll()) != null) { | |
if (other.isCancelled()) { | |
f.cancel(true); | |
} | |
else if (!other.isDone()) { | |
queue.offer(other); | |
} | |
else { | |
try { | |
f.complete(other.get()); | |
} catch(ExecutionException e) { | |
f.completeExceptionally(e.getCause()); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
} | |
}); | |
if (shutdown) { | |
executor.shutdown(); | |
} | |
return f; | |
} | |
public static <A> CompletableFuture<A> convert(Future<A> f, ExecutorService executor) { | |
return convert(f, executor, false); | |
} | |
public static <A> CompletableFuture<A> convert(Future<A> f) { | |
return convert(f, Executors.newSingleThreadExecutor(), true); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.hamnaberg.future; | |
import org.junit.Test; | |
import java.util.concurrent.*; | |
import static org.junit.Assert.*; | |
public class FutureConverterTest { | |
@Test | |
public void minimalFutureTest() throws InterruptedException { | |
ExecutorService exec = Executors.newSingleThreadExecutor(); | |
Future<Integer> future = exec.submit(sleepAnd(1000, () -> 1000)); | |
ExecutorService single = Executors.newSingleThreadExecutor(); | |
CompletableFuture<Integer> f = FutureConverter.convert(future, single); | |
try { | |
int value = f.get(); | |
assertEquals("meh", 1000, value); | |
} catch (ExecutionException e) { | |
fail(e.getCause().getMessage()); | |
} finally { | |
exec.shutdown(); | |
single.shutdown(); | |
} | |
} | |
@Test | |
public void failWithException() throws InterruptedException { | |
ExecutorService exec = Executors.newSingleThreadExecutor(); | |
Future<Integer> future = exec.submit(sleepAnd(1000, () -> { | |
throw new RuntimeException("Hello"); | |
})); | |
ExecutorService single = Executors.newSingleThreadExecutor(); | |
CompletableFuture<Integer> f = FutureConverter.convert(future, single); | |
try { | |
f.get(); | |
fail("Some value was here"); | |
} catch (ExecutionException e) { | |
assertEquals("Hello", e.getCause().getMessage()); | |
} finally { | |
exec.shutdown(); | |
single.shutdown(); | |
} | |
} | |
@Test | |
public void cancel() throws InterruptedException { | |
ExecutorService exec = Executors.newSingleThreadExecutor(); | |
Future<Integer> future = exec.submit(sleepAnd(1000, () -> { | |
throw new RuntimeException("Hello"); | |
})); | |
ExecutorService single = Executors.newSingleThreadExecutor(); | |
CompletableFuture<Integer> f = FutureConverter.convert(future, single); | |
future.cancel(true); | |
try { | |
f.get(); | |
fail("Some value was here"); | |
} catch (ExecutionException e) { | |
fail("Unexpected exception"); | |
} catch (CancellationException e) { | |
assertTrue(true); | |
} finally { | |
exec.shutdown(); | |
single.shutdown(); | |
} | |
} | |
private <A> Callable<A> sleepAnd(long amount, Callable<A> andThen) { | |
return () -> { | |
try { | |
Thread.sleep(amount); | |
return andThen.call(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment