Last active
March 1, 2020 08:11
-
-
Save marioosh/0d583c74880f23a93f6c to your computer and use it in GitHub Desktop.
websocket based on akka-http
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val actorAsSource = builder.materializedValue.map(actor => UserJoined(user, actor)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ChatRoom(roomId: Int, actorSystem: ActorSystem) { | |
private[this] val chatRoomActor = actorSystem.actorOf(Props(classOf[ChatRoomActor], roomId)) | |
def websocketFlow(user: String): Flow[Message, Message, _] = ??? | |
def sendMessage(message: ChatMessage): Unit = chatRoomActor ! message | |
} | |
object ChatRoom { | |
def apply(roomId: Int)(implicit actorSystem: ActorSystem) = new ChatRoom(roomId, actorSystem) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ChatRoomActor(roomId: Int) extends Actor { | |
var participants: Map[String, ActorRef] = Map.empty[String, ActorRef] | |
override def receive: Receive = { | |
case UserJoined(name, actorRef) => | |
participants += name -> actorRef | |
broadcast(SystemMessage(s"User $name joined channel...")) | |
println(s"User $name joined channel[$roomId]") | |
case UserLeft(name) => | |
println(s"User $name left channel[$roomId]") | |
broadcast(SystemMessage(s"User $name left channel[$roomId]")) | |
participants -= name | |
case msg: IncomingMessage => | |
broadcast(msg) | |
} | |
def broadcast(message: ChatMessage): Unit = participants.values.foreach(_ ! message) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ChatRooms { | |
var chatRooms: Map[Int, ChatRoom] = Map.empty[Int, ChatRoom] | |
def findOrCreate(number: Int)(implicit actorSystem: ActorSystem): ChatRoom = chatRooms.getOrElse(number, createNewChatRoom(number)) | |
private def createNewChatRoom(number: Int)(implicit actorSystem: ActorSystem): ChatRoom = { | |
val chatroom = ChatRoom(number) | |
chatRooms += number -> chatroom | |
chatroom | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val echoService: Flow[Message, Message, _] = Flow[Message].map { | |
case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt) | |
case _ => TextMessage("Message type unsupported") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val route = get { | |
pathEndOrSingleSlash { | |
complete("Welcome to websocket server") | |
} | |
} ~ | |
path("ws-echo") { | |
get { | |
handleWebsocketMessages(echoService) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pathPrefix("ws-chat" / IntNumber) { chatId => | |
parameter('name) { userName => | |
handleWebsocketMessages(ChatRooms.findOrCreate(chatId).websocketFlow(userName)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Server extends App { | |
implicit val actorSystem = ActorSystem("akka-system") | |
implicit val flowMaterializer = ActorMaterializer() | |
val interface = "localhost" | |
val port = 8080 | |
import Directives._ | |
val route = get { | |
pathEndOrSingleSlash { | |
complete("Welcome to websocket server") | |
} | |
} | |
val binding = Http().bindAndHandle(route, interface, port) | |
println(s"Server is now online at http://$interface:$port\nPress RETURN to stop...") | |
StdIn.readLine() | |
import actorSystem.dispatcher | |
binding.flatMap(_.unbind()).onComplete(_ => actorSystem.shutdown()) | |
println("Server is down...") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def websocketFlow(user: String): Flow[Message, Message, _] = | |
//Factory method allows for materialization of this Source | |
Flow(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) { | |
implicit builder => | |
chatSource => //it's Source from parameter | |
//flow used as input, it takes Messages | |
val fromWebsocket = builder.add( | |
Flow[Message].collect { | |
case TextMessage.Strict(txt) => IncomingMessage(user, txt) | |
}) | |
//flow used as output, it returns Messages | |
val backToWebsocket = builder.add( | |
Flow[ChatMessage].map { | |
case ChatMessage(author, text) => TextMessage(s"[$author]: $text") | |
} | |
) | |
//send messages to the actor, if sent also UserLeft(user) before stream completes. | |
val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user)) | |
//merges both pipes | |
val merge = builder.add(Merge[ChatEvent](2)) | |
//Materialized value of Actor who sits in the chatroom | |
val actorAsSource = builder.materializedValue.map(actor => UserJoined(user, actor)) | |
//Message from websocket is converted into IncommingMessage and should be sent to everyone in the room | |
fromWebsocket ~> merge.in(0) | |
//If Source actor is just created, it should be sent as UserJoined and registered as particiant in the room | |
actorAsSource ~> merge.in(1) | |
//Merges both pipes above and forwards messages to chatroom represented by ChatRoomActor | |
merge ~> chatActorSink | |
//Actor already sits in chatRoom so each message from room is used as source and pushed back into the websocket | |
chatSource ~> backToWebsocket | |
// expose ports | |
(fromWebsocket.inlet, backToWebsocket.outlet) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WSClient(url: String, name: String, actorSystem: ActorSystem) extends WebSocketClient(new URI(url), new Draft_17()) { | |
override def onMessage(message: String): Unit = println(message) | |
override def onError(ex: Exception): Unit = println("Websocket Error: " + ex.getMessage) | |
override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed") | |
override def onOpen(handshakedata: ServerHandshake): Unit = println("Websocket opened for name=" + name) | |
def spam(message: String, numberOfTimes: Int = 1000) = { | |
val talkActor = actorSystem.actorOf(Props(new Actor { | |
import actorSystem.dispatcher | |
import scala.concurrent.duration._ | |
var counter: Int = 0 | |
override def receive: Receive = { | |
case message: String => | |
counter = counter + 1 | |
send(s"[$name] message #$counter") | |
if (counter < numberOfTimes) | |
actorSystem.scheduler.scheduleOnce(rand.seconds, self, message) | |
} | |
def rand: Int = 1 + Random.nextInt(9) //message every 1-10 seconds | |
})) | |
talkActor ! message | |
} | |
} | |
object WSClient { | |
def apply(url: String, name: String)(implicit actorSystem: ActorSystem): WSClient = { | |
new WSClient(url, name, actorSystem) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello ,
Good work , but i have a question : if i want to save the list of msg and the list of room in db how i do it ??