Created
December 19, 2020 11:24
-
-
Save shankarshastri/e230dd07b82d2d717dce3b652bf807b2 to your computer and use it in GitHub Desktop.
AkkaTypedWebsocketServer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.akka.websocket | |
import akka.NotUsed | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.{ActorRef, ActorSystem, Behavior} | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} | |
import akka.http.scaladsl.server.Directives._ | |
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} | |
import akka.stream.typed.scaladsl.{ActorSink, ActorSource} | |
import akka.stream.{Materializer, OverflowStrategy} | |
import com.akka.websocket.BroadCastBehaviour.broadCastActorBehaviour | |
import akka.actor.typed.scaladsl.AskPattern._ | |
import akka.util.Timeout | |
import com.akka.websocket.Model.{Broadcast, BroadcastToAll, MessageWrapperTrait, RequestMessageWrapper, ResponseMessageWrapper, UserAdded, WebSocketMsg} | |
import scala.concurrent.duration.DurationInt | |
import scala.concurrent.{ExecutionContextExecutor, Future} | |
object AkkaTypedWebsocketServer extends App { | |
implicit val system: ActorSystem[NotUsed] = ActorSystem(initActorSystemBehaviour, "http-system") | |
def sinkActor: Behavior[MessageWrapperTrait] = { | |
Behaviors.receive { | |
(context, msg) => { | |
msg match { | |
case Model.RequestMessageWrapper(message, replyTo) => | |
message match { | |
case message: TextMessage => | |
replyTo ! ResponseMessageWrapper(TextMessage(Source.single(s"Hello ${ message.getStrictText }") ++ Source.single("!"))) | |
case message: BinaryMessage => | |
message.dataStream.runWith(Sink.ignore) | |
} | |
Behaviors.same | |
} | |
} | |
} | |
} | |
def webSocketConnections(l: List[ActorRef[Message]] = List.empty): Behavior[WebSocketMsg] = { | |
Behaviors.receive[WebSocketMsg] { | |
(context, message) => { | |
message match { | |
case Model.UserAdded(actorRef) => webSocketConnections(actorRef :: l) | |
case Model.BroadcastToAll(msg) => | |
context.spawnAnonymous(broadCastActorBehaviour) ! Broadcast(l, msg) | |
Behaviors.same | |
} | |
} | |
} | |
} | |
def initActorSystemBehaviour: Behavior[NotUsed] = { | |
Behaviors.setup[NotUsed](implicit e => { | |
implicit val sys: ActorSystem[Nothing] = e.system | |
implicit val mat: Materializer = Materializer(e.system) | |
implicit val ec: ExecutionContextExecutor = e.executionContext | |
val webSocketActorRef = e.spawnAnonymous(webSocketConnections()) | |
val sinkActorRef = e.spawnAnonymous(sinkActor) | |
def chat(implicit mat: Materializer): Flow[Message, Message, Any] = { | |
val (a, s) = ActorSource.actorRef[Message](completionMatcher = PartialFunction.empty, | |
failureMatcher = PartialFunction.empty, 20, OverflowStrategy.dropBuffer) | |
.toMat(Sink.asPublisher(false))(Keep.both).run()(mat) | |
webSocketActorRef ! UserAdded(a) | |
// https://stackoverflow.com/questions/41316173/akka-websocket-how-to-close-connection-by-server | |
val sink = Flow[Message].map { | |
case TextMessage.Strict(msg) => | |
// Incoming message from ws | |
TextMessage(Source.single(s"Hello ${ msg }") ++ Source.single("!")) :: Nil | |
}.to(Sink.seq) | |
Flow.fromSinkAndSource(sink, Source.fromPublisher(s)) | |
} | |
def chatWithActors(implicit mat: Materializer): Flow[Message, Message, Any] = { | |
val (a, s) = ActorSource.actorRef[Message](completionMatcher = PartialFunction.empty, | |
failureMatcher = PartialFunction.empty, 20, OverflowStrategy.dropBuffer) | |
.toMat(Sink.asPublisher(false))(Keep.both).run()(mat) | |
webSocketActorRef ! UserAdded(a) | |
implicit val timeout: Timeout = 3.seconds | |
val sinkFromActor = Flow[Message].mapAsync(10)(msg => { | |
sinkActorRef.ask[ResponseMessageWrapper](e => RequestMessageWrapper(msg, e)) | |
}).map(e => e.responseMessage).to(Sink.seq) | |
// https://stackoverflow.com/questions/41316173/akka-websocket-how-to-close-connection-by-server | |
Flow.fromSinkAndSource(sinkFromActor, Source.fromPublisher(s)) | |
} | |
val route = | |
path("greeter") { | |
handleWebSocketMessages(chatWithActors) | |
} ~ path("broadcast") { | |
webSocketActorRef ! BroadcastToAll(TextMessage("Hey Everyone")) | |
complete(StatusCodes.Accepted) | |
} | |
val bindingFuture = Http().newServerAt("localhost", 8080).bind(route) | |
println(s"Server online at http://localhost:8080/\n") | |
scala.io.StdIn.readLine() | |
bindingFuture | |
.flatMap(_.unbind()) // trigger unbinding from the port | |
.onComplete(_ => system.terminate()) // and shutdown when done | |
Behaviors.same | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment