Skip to content

Instantly share code, notes, and snippets.

@ajurasz
Last active December 19, 2016 22:15
Show Gist options
  • Save ajurasz/48c018b2ed6e4b53553008f00f72a323 to your computer and use it in GitHub Desktop.
Save ajurasz/48c018b2ed6e4b53553008f00f72a323 to your computer and use it in GitHub Desktop.
def executor = new ThreadPoolExecutor(
cores, cores,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1000))
DefaultFiberNode addFiber(OperationDefinition fiberDefinition) {
RxHelper.toObservable(vertx.eventBus().consumer(fiberDefinition.address()))
.flatMap({ data ->
Observable.just(data)
.subscribeOn(Schedulers.from(executor))
.doOnNext({
def msg = it as Message
try {
fiberDefinition.handler().handle(new VertxOperationContext(msg, this))
} catch (Exception e) {
msg.fail(100, e.message)
}
})
}, cores).subscribe()
this
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment