Last active
August 3, 2025 05:14
-
-
Save luavixen/1d1577850cb51eb3333db1d54750dd0e to your computer and use it in GitHub Desktop.
My Promise and Async classes for Kotlin that enable pleasant async/await-style control flow in a small package.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.foxgirl.async | |
import java.util.concurrent.CompletableFuture | |
import kotlin.coroutines.* | |
/** | |
* Provides async/await functionality in Kotlin using coroutines without a | |
* coroutine runtime, powered by the [Promise] class. | |
* | |
* @author Lua MacDougall <[email protected]> | |
*/ | |
object Async { | |
private class CompletableFutureContinuation<T>(override val context: CoroutineContext) : CompletableFuture<T>(), Continuation<T> { | |
fun startCoroutine(coroutine: suspend () -> T): CompletableFutureContinuation<T> { | |
coroutine.startCoroutine(this) | |
return this | |
} | |
override fun resumeWith(result: Result<T>) { | |
result.fold(::complete, ::completeExceptionally) | |
} | |
} | |
/** | |
* Launches a coroutine and returns a [Promise] that will resolve to the coroutine's result. | |
* | |
* @param context The coroutine context to be used. Defaults to [EmptyCoroutineContext] if not specified. | |
* @param coroutine The suspending function to be run as a coroutine. | |
* @return A promise representing the result of the asynchronous operation. | |
*/ | |
fun <T> go(context: CoroutineContext = EmptyCoroutineContext, coroutine: suspend () -> T): Promise<T> { | |
return Promise(CompletableFutureContinuation<T>(context).startCoroutine(coroutine)) | |
} | |
/** | |
* Suspends the coroutine until the given [Promise] has completed and returns the value. | |
* | |
* @param promise The promise to await. | |
* @return The resolved value from the promise if successful. | |
* @throws Throwable If the promise rejects, the error will be propagated as a coroutine exception. | |
*/ | |
suspend fun <T> await(promise: Promise<T>): T { | |
return suspendCoroutine { promise.finally { result -> it.resumeWith(result) } } | |
} | |
/** | |
* Suspends the coroutine until all the given [Promise]s have completed. | |
* Returns a list of all the values in the same order. | |
* | |
* @param promises The promises to await as a variable number of arguments. | |
* @return A list of the resolved values from the promises if successful. | |
* @throws Throwable If one or more promises reject, the error(s) will be propagated as a coroutine exception. | |
*/ | |
suspend fun <T> await(vararg promises: Promise<T>): List<T> = await(promises.toList()) | |
/** | |
* Suspends the coroutine until all the given [Promise]s have completed. | |
* Returns a list of all the values in the same order. | |
* | |
* @param promises The promises to await as a collection. | |
* @return A list of the resolved values from the promises if successful. | |
* @throws Throwable If one or more promises reject, the error(s) will be propagated as a coroutine exception. | |
*/ | |
suspend fun <T> await(promises: Collection<Promise<T>>): List<T> = await(Promise.all(promises)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.foxgirl.async | |
import java.util.concurrent.CompletableFuture | |
import java.util.concurrent.Executor | |
import java.util.concurrent.Future | |
import java.util.function.BiConsumer | |
import java.util.function.BiFunction | |
/** | |
* Represents an abstraction over a [CompletableFuture] that simplifies the interface | |
* while providing additional utilities for asynchronous composition and result handling. | |
* | |
* @param T The type of result that this promise will eventually provide. | |
* @author Lua MacDougall <[email protected]> | |
*/ | |
class Promise<T>(private val future: CompletableFuture<T>) : Future<T> by future { | |
/** | |
* Constructs a [Promise] that has already resolved with the provided value. | |
* | |
* @param value The value to wrap as a promise, marking it as already completed. | |
*/ | |
constructor(value: T) : this(CompletableFuture.completedFuture(value)) | |
/** | |
* Constructs a [Promise] that has already rejected with the provided cause. | |
* | |
* @param cause The [Throwable] to wrap as a promise, marking it as already completed. | |
*/ | |
constructor(cause: Throwable) : this(CompletableFuture.failedFuture(cause)) | |
/** | |
* Constructs a new [Promise] with the provided supplier function. | |
* | |
* @param executor The executor to run the supplier function. | |
* @param supplier The function that provides the value to resolve the promise. | |
*/ | |
constructor(executor: Executor, supplier: () -> T) : this(evaluatePromiseSupplier(executor, supplier)) | |
/** | |
* Constructs a new [Promise], initializing it with a consumer function that | |
* accepts a [CompletableFuture] which should be used to resolve the promise. | |
* | |
* @param consumer | |
* A function that takes a [CompletableFuture] and defines how the promise should be resolved or rejected. | |
* If the consumer throws an exception, the promise will be rejected with that exception. | |
*/ | |
constructor(consumer: (CompletableFuture<T>) -> Unit) : this(evaluatePromiseConsumer(consumer)) | |
/** | |
* Indicates whether the promise has completed, either resolved or rejected. | |
*/ | |
val completed: Boolean get() = future.isDone | |
/** | |
* If this promise has completed, this is a [Result] containing the resolved value or rejected error. | |
* Otherwise, this variable will be `null`. | |
*/ | |
val result: Result<T>? get() { | |
return if (completed) { | |
try { | |
Result.success(get()) | |
} catch (cause: Throwable) { | |
Result.failure(cause) | |
} | |
} else { | |
null | |
} | |
} | |
/** | |
* Executes a function with the resolved value of the promise. | |
* | |
* If an executor is provided, the function will execute asynchronously using it. | |
* Otherwise, the function execution will be done synchronously. | |
* | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function The function to apply to the current promise's value. | |
* @return A new promise that resolves to the result of the applied function. | |
*/ | |
fun <U> then(executor: Executor? = null, function: (T) -> U): Promise<U> { | |
return if (executor != null) { | |
Promise(future.thenApplyAsync(function, executor)) | |
} else { | |
Promise(future.thenApply(function)) | |
} | |
} | |
/** | |
* Executes a function that returns a new promise, composing it with the current promise. | |
* | |
* If an executor is provided, the function will execute asynchronously using it. | |
* Otherwise, the function execution will be performed synchronously. | |
* | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function The function to apply to the current promise's value. This function returns a new promise. | |
* @return A new promise that resolves to the resolved value of the promise returned by the applied function. | |
*/ | |
fun <U> thenCompose(executor: Executor? = null, function: (T) -> Promise<U>): Promise<U> { | |
return if (executor != null) { | |
Promise(future.thenComposeAsync({ function(it).future }, executor)) | |
} else { | |
Promise(future.thenCompose { function(it).future }) | |
} | |
} | |
/** | |
* Combines the result of this promise with another promise using the supplied function. | |
* | |
* If an executor is provided, the combination function will execute asynchronously using it. | |
* Otherwise, the combination execution will be performed synchronously. | |
* | |
* @param other The other promise to combine with. | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function The function that takes the value of this promise and the value of the other promise and produces a combined result. | |
* @return A new promise that resolves to the combined result of this promise and the other promise. | |
*/ | |
fun <U, V> thenCombine(other: Promise<U>, executor: Executor? = null, function: (T, U) -> V): Promise<V> { | |
return if (executor != null) { | |
Promise(future.thenCombineAsync(other.future, function, executor)) | |
} else { | |
Promise(future.thenCombine(other.future, function)) | |
} | |
} | |
/** | |
* Handles exceptions that occur during promise execution by providing a recovery function. | |
* | |
* If an executor is provided, the recovery function will execute asynchronously using it. | |
* Otherwise, the recovery function execution will be performed synchronously. | |
* | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function | |
* The function to apply when an exception occurs. | |
* This function receives the exception and should return a value of type [T] to recover from the error. | |
* @return | |
* A new promise that resolves to either the original result (if no exception occurred) | |
* or the result of the recovery function (if an exception was handled). | |
*/ | |
fun exceptionally(executor: Executor? = null, function: (Throwable) -> T): Promise<T> { | |
return if (executor != null) { | |
Promise(future.exceptionallyAsync(function, executor)) | |
} else { | |
Promise(future.exceptionally(function)) | |
} | |
} | |
/** | |
* Executes a function when the promise completes, regardless of whether it resolved or rejected. | |
* This is useful for cleanup operations or logging. | |
* | |
* If an executor is provided, the function will execute asynchronously using it. | |
* Otherwise, the function execution will be performed synchronously. | |
* | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function | |
* The function to execute when the promise completes. | |
* This function receives a [Result] containing either the resolved value or rejected error. | |
* @return A new promise that completes with the same result as the original promise. | |
*/ | |
fun finally(executor: Executor? = null, function: (Result<T>) -> Unit): Promise<T> { | |
val handler = BiConsumer<T?, Throwable?> { value, cause -> function(createResult(value, cause)) } | |
return if (executor != null) { | |
Promise(future.whenCompleteAsync(handler, executor)) | |
} else { | |
Promise(future.whenComplete(handler)) | |
} | |
} | |
/** | |
* Transforms the result of the promise (whether resolved or rejected) into a new value. | |
* | |
* If an executor is provided, the transformation function will execute asynchronously using it. | |
* Otherwise, the transformation execution will be performed synchronously. | |
* | |
* @param executor The executor to use for asynchronous execution. Defaults to `null`. | |
* @param function | |
* The function to apply to the promise result. | |
* This function receives a [Result] containing either the resulting value or error, and returns a new value of type [U]. | |
* @return A new promise that completes with the result of the transformation function. | |
*/ | |
fun <U> handle(executor: Executor? = null, function: (Result<T>) -> U): Promise<U> { | |
val handler = BiFunction<T?, Throwable?, U> { value, cause -> function(createResult(value, cause)) } | |
return if (executor != null) { | |
Promise(future.handleAsync(handler, executor)) | |
} else { | |
Promise(future.handle(handler)) | |
} | |
} | |
companion object { | |
private fun <T> evaluatePromiseSupplier(executor: Executor, supplier: () -> T): CompletableFuture<T> { | |
return CompletableFuture.supplyAsync(supplier, executor) | |
} | |
private fun <T> evaluatePromiseConsumer(consumer: (CompletableFuture<T>) -> Unit): CompletableFuture<T> { | |
return CompletableFuture<T>().also { future -> | |
try { | |
consumer(future) | |
} catch (cause: Throwable) { | |
future.completeExceptionally(cause) | |
} | |
} | |
} | |
private fun <T> createResult(value: T?, cause: Throwable?): Result<T> { | |
return if (cause != null) { Result.failure(cause) } else { Result.success(value) } as Result<T> | |
} | |
private class AwaitingCompletableFuture<T>(promises: Array<out Promise<T>>) : CompletableFuture<List<Result<T>>>() { | |
private val results = arrayOfNulls<Result<T>>(promises.size) | |
private var count = 0 | |
init { | |
for ((i, promise) in promises.withIndex()) { | |
promise.finally { result -> | |
val complete = synchronized(this@AwaitingCompletableFuture) { | |
results[i] = result | |
count++ | |
count >= results.size | |
} | |
if (complete) { | |
complete(results.asList() as List<Result<T>>) | |
} | |
} | |
} | |
} | |
} | |
private fun <T> waitForResults(promises: Array<out Promise<T>>): Promise<List<Result<T>>> { | |
return Promise(AwaitingCompletableFuture(promises)) | |
} | |
private fun <T> flattenResults(promise: Promise<List<Result<T>>>): Promise<List<T>> { | |
return promise.then { results -> | |
val values = ArrayList<T>(results.size) | |
val iterator = results.iterator() | |
while (iterator.hasNext()) { | |
val value = iterator.next().getOrElse { causeFirst -> | |
throw if (iterator.hasNext()) { | |
RuntimeException(causeFirst).also { | |
do { | |
val causeNext = iterator.next().exceptionOrNull() | |
if (causeNext != null) it.addSuppressed(causeNext) | |
} while (iterator.hasNext()) | |
} | |
} else { | |
causeFirst | |
} | |
} | |
values.add(value) | |
} | |
values | |
} | |
} | |
/** | |
* Waits for all provided promises to complete and returns their results, regardless of success or failure. | |
* This function will not reject even if some promises fail - instead, it collects all results | |
* (both successful values and exceptions) and returns them as a list of [Result] objects. | |
* | |
* @param promises The collection of promises to wait for completion. | |
* @return A new promise that resolves to a list of [Result] objects, one for each input promise. | |
*/ | |
fun <T> settled(promises: Collection<Promise<T>>) = waitForResults(promises.toTypedArray()) | |
/** | |
* Waits for all provided promises to complete successfully and returns their values. | |
* If any promise fails, this function will reject with an exception. If multiple promises fail, | |
* the first exception encountered will be thrown with additional failures added as suppressed exceptions. | |
* | |
* @param promises The collection of promises that must all succeed. | |
* @return A new promise that resolves to a list of values from all successful promises, or rejects if any promise fails. | |
*/ | |
fun <T> all(promises: Collection<Promise<T>>) = flattenResults(waitForResults(promises.toTypedArray())) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment