Created
June 11, 2015 15:52
-
-
Save danveloper/c096d1cb502ccc8b9d2d to your computer and use it in GitHub Desktop.
Ratpack 0.9.17 WebSocket Streaming
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.inject.Inject | |
import com.google.inject.Scopes | |
import org.reactivestreams.Publisher | |
import ratpack.exec.ExecController | |
import ratpack.form.Form | |
import ratpack.func.Function | |
import ratpack.groovy.Groovy | |
import ratpack.guice.Guice | |
import ratpack.server.RatpackServer | |
import ratpack.server.ServerConfig | |
import ratpack.server.Service | |
import ratpack.server.StartEvent | |
import ratpack.stream.Streams | |
import ratpack.websocket.WebSockets | |
import java.time.Duration | |
import static com.google.common.collect.Queues.newArrayDeque | |
import static groovy.json.JsonOutput.toJson | |
class MainGroovy { | |
static class StreamContainer implements Service { | |
Publisher<String> stream | |
private final Deque<String> queue = newArrayDeque() | |
private final ExecController execController | |
@Inject | |
StreamContainer(ExecController execController) { | |
this.execController = execController | |
} | |
void onStart(StartEvent event) { | |
this.stream = poll { | |
queue.pollLast() ?: [] | |
} | |
} | |
void publish(String msg) { | |
queue.push(msg) | |
} | |
private Publisher<String> poll(Closure callback) { | |
Streams.periodically(execController.executor, Duration.ofMillis(100), callback as Function) map { | |
it ? toJson([message: it]) : "" | |
} multicast() | |
} | |
} | |
public static void main(String[] args) { | |
RatpackServer.start { spec -> spec | |
.serverConfig(ServerConfig.noBaseDir()) | |
.registry(Guice.registry { b -> | |
b.binder { binder -> | |
binder.bind(StreamContainer).in(Scopes.SINGLETON) | |
} | |
}) | |
.handlers(Groovy.chain { | |
get { | |
response.send("text/html", """ | |
<!DOCTYPE HTML> | |
<html> | |
<body> | |
<script> | |
if (!window.WebSocket) { | |
alert("This won't work in your browser. Try Chrome or a gooder version of Safari."); | |
} else { | |
function connectWs() { | |
if (!window.ws || window.ws.readyState != WebSocket.OPEN) { | |
window.ws = new WebSocket("ws://"+location.host+"/stream"); | |
window.ws.onopen = function(event) { | |
console.log("WebSocket opened!"); | |
}; | |
window.ws.onmessage = function(event) { | |
if (event.data) { | |
var div = document.createElement('div'); | |
div.innerHTML = JSON.parse(event.data).message; | |
document.body.appendChild(div); | |
} | |
}; | |
window.ws.onclose = function(event) { | |
var timer = setTimeout(function() { | |
console.log("Retrying connection..."); | |
connectWs(); | |
if (window.ws.readyState == WebSocket.OPEN) { | |
clearTimeout(timer); | |
} | |
}, 1000); | |
}; | |
} | |
} | |
connectWs(); | |
} | |
</script> | |
</body> | |
</html> | |
""") | |
} | |
handler("stream") { | |
byMethod { | |
get { ctx -> | |
def streamContainer = ctx.get(StreamContainer) | |
def stream = streamContainer.stream | |
WebSockets.websocketBroadcast(ctx, stream) | |
} | |
post { ctx -> | |
def form = ctx.parse(Form) | |
def msg = form.msg as String | |
def streamContainer = ctx.get(StreamContainer) | |
streamContainer.publish(msg) | |
ctx.response.status(202) | |
ctx.response.send() | |
} | |
} | |
} | |
}) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example curl call to post a message ==>
curl -d "msg=bar" localhost:5050/stream