-
-
Save pimg/1f466becafccb5fe27f06caeeba8e41b to your computer and use it in GitHub Desktop.
Creating a SSE streaming with Apache Camel and Ratpack, using Rx Groovy as a glue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grab('com.netflix.rxjava:rxjava-groovy:0.20.7') | |
@Grab('io.reactivex:rxjava-reactive-streams:0.3.0') | |
@Grab('org.apache.camel:camel-rx:2.14.1') | |
@Grab('io.ratpack:ratpack-groovy:0.9.11') | |
@Grab('org.slf4j:slf4j-simple:1.6.6') | |
import org.apache.camel.impl.* | |
import org.apache.camel.rx.* | |
import static rx.RxReactiveStreams.toPublisher | |
import static ratpack.groovy.Groovy.ratpack | |
import static ratpack.sse.ServerSentEvents.serverSentEvents; | |
def camelContext = new DefaultCamelContext() | |
def rx = new ReactiveCamel(camelContext); | |
def observable = rx.toObservable("timer:foo?period=2s").map{ Math.random() > 0.5 ? "alpha" : "omega" } | |
def toStream(obs, context, messageFilter) { | |
toPublisher(obs.takeWhile{ context.directChannelAccess.channel.isOpen() } | |
.filter{ it == messageFilter }) | |
} | |
camelContext.start() | |
addShutdownHook{ camelContext.stop() } | |
/** | |
* After run the example, try the following pages: | |
* http://localhost:5050/random/alpha | |
* http://localhost:5050/random/omega | |
*/ | |
ratpack { | |
handlers { | |
get('random/:type') { | |
def stream = toStream(observable, context, pathTokens.get('type')) | |
def events = serverSentEvents(stream){ e -> | |
println e.item | |
e.event("random").data(e.item) | |
} | |
onClose { println "Connection End" } | |
render(events) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment