Created
July 17, 2020 00:18
-
-
Save HaloFour/a2c3d2e7b116801a08949e71af810896 to your computer and use it in GitHub Desktop.
Async/Await in Loom
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CompletionStage; | |
import org.reactivestreams.Subscription; | |
import reactor.core.CoreSubscriber; | |
import reactor.core.Exceptions; | |
import reactor.core.publisher.Mono; | |
public class Async { | |
private static final ContinuationScope continuationScope = new ContinuationScope("Async"); | |
public static <T> CompletionStage<T> async(Callable<T> action) { | |
var future = new CompletableFuture<T>(); | |
var continuation = new Continuation(continuationScope, () -> { | |
try { | |
var result = action.call(); | |
future.complete(result); | |
} catch (Exception exception) { | |
future.completeExceptionally(exception); | |
} | |
}); | |
continuation.run(); | |
return future; | |
} | |
public static <T> T await(CompletionStage<T> completionStage) { | |
var currentContinuation = Continuation.getCurrentContinuation(continuationScope); | |
var future = completionStage.toCompletableFuture(); | |
if (!future.isDone()) { | |
future.whenComplete((result, error) -> currentContinuation.run()); | |
Continuation.yield(continuationScope); | |
} | |
return future.join(); | |
} | |
public static <T> Mono<T> mono(Callable<T> action) { | |
return Mono.create(sink -> { | |
var continuation = new Continuation(continuationScope, () -> { | |
try { | |
var result = action.call(); | |
sink.success(result); | |
} catch (Exception exception) { | |
sink.error(exception); | |
} | |
}); | |
continuation.run(); | |
}); | |
} | |
public static <T> T await(Mono<T> mono) { | |
var currentContinuation = Continuation.getCurrentContinuation(continuationScope); | |
var subscriber = new CoreSubscriber<T>() { | |
public T result; | |
public Throwable error; | |
public volatile boolean done; | |
@Override | |
public void onSubscribe(Subscription s) { | |
s.request(1L); | |
} | |
@Override | |
public void onNext(T result) { | |
this.result = result; | |
} | |
@Override | |
public void onError(Throwable error) { | |
this.error = error; | |
done(); | |
} | |
@Override | |
public void onComplete() { | |
done(); | |
} | |
private void done() { | |
done = true; | |
if (Continuation.getCurrentContinuation(continuationScope) != currentContinuation) { | |
currentContinuation.run(); | |
} | |
} | |
}; | |
mono.subscribe(subscriber); | |
if (!subscriber.done) { | |
Continuation.yield(continuationScope); | |
} | |
if (subscriber.error != null) { | |
throw Exceptions.propagate(subscriber.error); | |
} else { | |
return subscriber.result; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment