Created
March 24, 2017 12:17
-
-
Save Minikloon/16332e91b1fc854f4bf40e3aa94becab to your computer and use it in GitHub Desktop.
Vertx async wrappers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.vertx.core.AbstractVerticle | |
import kotlinx.coroutines.experimental.delay | |
import kotlinx.coroutines.experimental.launch | |
class VerticleWithAsync : AbstractVerticle() { | |
override fun start() { | |
val eb = vertx.eventBus() | |
launch(CurrentVertx) { // requires to be inside a vertx-supplied thread | |
println("ayy") | |
delay(1000) | |
println("lmao") // this continuation will be executed on the verticle's context | |
val rainbowMessage = eb.sendAsync<String>("somewhere over the rainbow", "uh anyone here??") // suspend point | |
val body = rainbowMessage.body() // also executed on verticle's context | |
println(body) | |
delay(500) | |
rainbowMessage.reply("yes i'm here") | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.vertx.core.AsyncResult | |
import io.vertx.core.Context | |
import io.vertx.core.Handler | |
import io.vertx.core.eventbus.EventBus | |
import io.vertx.core.eventbus.Message | |
import io.vertx.core.impl.VertxImpl | |
import kotlinx.coroutines.experimental.CancellableContinuation | |
import kotlinx.coroutines.experimental.Delay | |
import java.util.concurrent.TimeUnit | |
import kotlin.coroutines.experimental.* | |
class VertxContinuation<in T>(val vertxContext: Context, val cont: Continuation<T>) : Continuation<T> by cont { | |
override fun resume(value: T) { | |
vertxContext.runOnContext { cont.resume(value) } | |
} | |
override fun resumeWithException(exception: Throwable) { | |
vertxContext.runOnContext { cont.resumeWithException(exception) } | |
} | |
} | |
object CurrentVertx : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor, Delay { | |
val vertxContext: Context | |
get() = VertxImpl.context() ?: throw IllegalStateException("Can't use CurrentVertx if not in a vertx-supplied thread") | |
override fun <T> interceptContinuation(continuation: Continuation<T>) = VertxContinuation(vertxContext, continuation) | |
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) { | |
vertxContext.owner().setTimer(unit.toMillis(time)) { continuation.resume(Unit) } | |
} | |
} | |
inline suspend fun <T> vx(crossinline callback: (Handler<AsyncResult<T>>) -> Unit) = suspendCoroutine<T> { cont -> | |
callback(Handler { result: AsyncResult<T> -> | |
if (result.succeeded()) { | |
cont.resume(result.result()) | |
} else { | |
cont.resumeWithException(result.cause()) | |
} | |
}) | |
} | |
// wrapper around message in case it's not needed to avoid calling .body() all the time | |
inline suspend fun <T> vxm(crossinline callback: (Handler<AsyncResult<Message<T>>>) -> Unit) = suspendCoroutine<T> { cont -> | |
callback(Handler { result: AsyncResult<Message<T>> -> | |
if (result.succeeded()) { | |
cont.resume(result.result().body()) | |
} else { | |
cont.resumeWithException(result.cause()) | |
} | |
}) | |
} | |
// example of a more specific vert.x method wrapper | |
suspend fun <TReply> EventBus.sendAsync(address: String, obj: Any) = vx<Message<TReply>> { | |
send(address, obj, it) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Based on https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md#wrapping-callbacks