Created
January 26, 2019 19:02
-
-
Save windoze/2bcdc7d10f3604c68eae0b26a5ee6feb to your computer and use it in GitHub Desktop.
Simple implementation of RPC infrastructure over Vertx EventBus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.type.TypeReference | |
import io.vertx.core.AsyncResult | |
import io.vertx.core.Handler | |
import io.vertx.core.Vertx | |
import io.vertx.core.eventbus.Message | |
import io.vertx.core.json.Json | |
import io.vertx.core.logging.Logger | |
import io.vertx.core.logging.LoggerFactory | |
import io.vertx.kotlin.coroutines.CoroutineVerticle | |
import io.vertx.kotlin.coroutines.dispatcher | |
import io.vertx.kotlin.coroutines.toChannel | |
import kotlinx.coroutines.launch | |
import java.lang.reflect.Proxy | |
import kotlin.coroutines.Continuation | |
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED | |
import kotlin.coroutines.resume | |
import kotlin.coroutines.resumeWithException | |
import kotlin.reflect.full.callSuspend | |
fun Any.stringify(): String = Json.encode(this) | |
@Suppress("ArrayInDataClass") | |
data class RpcRequest( | |
val service: String = "", | |
val method: String = "", | |
val args: Array<out Any>? = null | |
) | |
data class RpcResponse( | |
val response: Any? = null | |
) | |
/** | |
* RpcServerVerticle hosts all RPC service objects | |
*/ | |
class RpcServerVerticle(private val channel: String) : CoroutineVerticle() { | |
private interface RpcServer { | |
suspend fun processRequest(request: RpcRequest): RpcResponse | |
companion object { | |
fun <T : Any> instance(impl: T): RpcServer { | |
return object : RpcServer { | |
override suspend fun processRequest(request: RpcRequest): RpcResponse { | |
val ret = impl::class.members.first { | |
// TODO: Check signature to support overloading | |
it.name == request.method | |
}.callSuspend(impl, *(request.args ?: arrayOf())) | |
return RpcResponse(ret) | |
} | |
} | |
} | |
} | |
} | |
private val services: HashMap<String, RpcServer> = hashMapOf() | |
override suspend fun start() { | |
launch(vertx.dispatcher()) { | |
for (msg in vertx.eventBus().consumer<String>(channel).toChannel(vertx)) { | |
// Start a new coroutine to handle the incoming request to support recursive call | |
launch(vertx.dispatcher()) { | |
val req = Json.decodeValue(msg.body(), RpcRequest::class.java) | |
try { | |
msg.reply((services[req.service]?.processRequest(req) | |
?: throw NoSuchElementException("Service ${req.service} not found")).stringify()) | |
} catch (e: Throwable) { | |
msg.fail(1, e.message) | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Register the service object | |
*/ | |
fun <T : Any> register(name: String, impl: T): RpcServerVerticle { | |
services[name] = RpcServer.instance(impl) | |
return this | |
} | |
} | |
/** | |
* Dynamically create the service proxy object for the given interface | |
*/ | |
inline fun <reified T : Any> getServiceProxy(vertx: Vertx, channel: String, name: String) = | |
Proxy.newProxyInstance(T::class.java.classLoader, arrayOf(T::class.java)) { _, method, args -> | |
val lastArg = args?.lastOrNull() | |
if (lastArg is Continuation<*>) { | |
// The last argument of a suspend function is the Continuation object | |
@Suppress("UNCHECKED_CAST") val cont = lastArg as Continuation<Any?> | |
val argsButLast = args.take(args.size - 1) | |
// Send request to the given channel on the event bus | |
vertx.eventBus().send(channel, RpcRequest(name, | |
method.name, | |
argsButLast.toTypedArray()).stringify(), | |
Handler<AsyncResult<Message<String>>> { event -> | |
// Resume the suspended coroutine on reply | |
if (event?.succeeded() == true) { | |
cont.resume(Json.decodeValue(event.result().body(), | |
object : TypeReference<RpcResponse>() {}).response) | |
} else { | |
cont.resumeWithException(event?.cause() ?: Exception("Unknown error")) | |
} | |
}) | |
// Suspend the coroutine to wait for the reply | |
COROUTINE_SUSPENDED | |
} else { | |
// The function is not suspend | |
null | |
} | |
} as T |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface FooBarSvc { | |
suspend fun foo(a: Int, b: String): String | |
suspend fun bar(x: String): Int | |
} | |
interface HelloSvc { | |
suspend fun hello(world: String): String | |
} | |
class TestRpcClientVerticle : CoroutineVerticle() { | |
private val log: Logger = LoggerFactory.getLogger(this.javaClass) | |
private val channel = "test-channel" | |
override suspend fun start() { | |
val svc: FooBarSvc = getServiceProxy(vertx, channel, "foobar") | |
log.info("Received string is '${svc.foo(42, "world")}'.") | |
vertx.close() | |
} | |
} | |
fun main() { | |
val vertx = Vertx.vertx() | |
// Implementation needs not to implement the service interface | |
class HelloSvcImpl { | |
// Method needs not to be suspend | |
fun hello(name: String): String = "Hello, $name!" | |
} | |
class FBSvcImpl : FooBarSvc { | |
val svc: HelloSvc = getServiceProxy(vertx, "test-channel", "hello") | |
override suspend fun foo(a: Int, b: String): String = "$a $b, ${svc.hello("world")}" | |
override suspend fun bar(x: String): Int = x.toInt() | |
} | |
vertx.deployVerticle(RpcServerVerticle("test-channel") | |
.register("hello", HelloSvcImpl()) | |
.register("foobar", FBSvcImpl())) | |
vertx.deployVerticle(TestRpcClientVerticle()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment