Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created April 5, 2018 16:26
Show Gist options
  • Save johanandren/e8505dd074bcf05d7d218394af181d8c to your computer and use it in GitHub Desktop.
Save johanandren/e8505dd074bcf05d7d218394af181d8c to your computer and use it in GitHub Desktop.
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;
import static akka.http.javadsl.server.Directives.*;
public class MinimalWebSocketServer {
public static void main(String[] args) throws Exception {
ActorSystem actorSystem = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(actorSystem);
Http http = Http.get(actorSystem);
final Flow<Message, Message, NotUsed> webSocketFlow =
Flow.of(Message.class)
.mapAsync(1, (message) -> {
if (message instanceof TextMessage) {
// Note: it is always safe to call getStreamedText - both on strict and on streamed - strict is just
// a single element stream in this case
final Source<String, ?> textStream = ((TextMessage) message).getStreamedText();
// note that you would likely want to make sure you don't collect any number of elements
// and fill up memory, but fail if you reach some limit to protect against DoS
final CompletionStage<String> entireTextInMemory =
textStream.runFold("", (textSoFar, textChunk) -> textSoFar + textChunk, materializer);
return entireTextInMemory;
} else throw new RuntimeException("I don't do binary, pal");
}).map(entireText -> {
System.out.println("Got a text: " + entireText);
return TextMessage.create("Why, thank you for that text message, client!");
});
final Route route = route(
path("test", () ->
get(() ->
handleWebSocketMessages(webSocketFlow)
)
)
);
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = route.flow(actorSystem, materializer);
http.bindAndHandle(routeFlow, ConnectHttp.toHost("127.0.0.1", 8082), materializer);
System.out.println("Server started at http://127.0.0.1:8082, press the famous any key to kill server");
System.in.read();
actorSystem.terminate();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment