Skip to content

Instantly share code, notes, and snippets.

@stream-iori
Created August 20, 2015 10:18
Show Gist options
  • Select an option

  • Save stream-iori/fb69d5ca4e1b650e8485 to your computer and use it in GitHub Desktop.

Select an option

Save stream-iori/fb69d5ca4e1b650e8485 to your computer and use it in GitHub Desktop.
rxjava eb
//返回一个Obserable,侦听发给heat-sensor的信息
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
//这个obserable持续1秒钟,并对数据进行map reduce操作
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
//再发给另外一个consumer通过eb
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment