Last active
June 25, 2023 15:33
-
-
Save dacr/f777f85e78b3c14c0433477a997ae000 to your computer and use it in GitHub Desktop.
Dump incoming websockets content / published by https://github.com/dacr/code-examples-manager #01916f4b-1529-46c3-952b-58cd56d34f09/ee88cc22bde70faf0587eaf06e16c7a8c2a1628b
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Dump incoming websockets content | |
// keywords : scala, actors, akka, http-client, client, json, json4s, websocket | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 01916f4b-1529-46c3-952b-58cd56d34f09 | |
// created-on : 2021-02-06T13:13:52Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
import $ivy.`com.typesafe.akka::akka-http:10.2.4` | |
import $ivy.`com.typesafe.akka::akka-actor-typed:2.6.13` | |
import $ivy.`com.typesafe.akka::akka-stream-typed:2.6.13` | |
import $ivy.`org.slf4j:slf4j-nop:1.7.30` // to avoid missing logging impl warning at startup | |
import akka.Done | |
import akka.actor.typed.{ActorRef, Behavior} | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.http.scaladsl._ | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.model.ws.{TextMessage, WebSocketRequest} | |
import akka.stream.OverflowStrategy | |
import akka.stream.scaladsl.Keep | |
import akka.stream.typed.scaladsl.{ActorSink, ActorSource} | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
object WebSockEcho { | |
// ----------------------------------------------------------------- | |
sealed trait Get | |
case class GetReceived(content:String) extends Get | |
case class GetFailure(ex:Throwable) extends Get | |
object GetFinished extends Get | |
def behaviorGet():Behavior[Get] = Behaviors.receiveMessage { | |
case GetReceived(content) => | |
println(content) | |
Behaviors.same | |
case GetFinished => | |
Behaviors.stopped | |
} | |
// ----------------------------------------------------------------- | |
sealed trait Put | |
case class PutRegister(controlActorRef:ActorRef[Control]) extends Put | |
case class PutTransmit(content:String) extends Put | |
object PutFinished extends Put | |
object PutShutdown extends Put | |
def behaviorPut():Behavior[Put] = Behaviors.receiveMessage { | |
case PutRegister(controlActorRef) => | |
Behaviors.receiveMessage { | |
case PutTransmit(content) => | |
controlActorRef ! ControlContent(content) | |
Behaviors.same | |
case PutShutdown => // Delayed finished as this behavior is the main one used on actor system start | |
Behaviors.setup{context => | |
context.scheduleOnce(1.second, context.self, PutFinished) | |
Behaviors.same | |
} | |
case PutFinished => | |
Behaviors.stopped // Means shutdown the actor system here ! | |
} | |
} | |
// ----------------------------------------------------------------- | |
sealed trait Control | |
object ControlDone extends Control | |
case class ControlContent(content:String) extends Control | |
case class ControlFailure(ex:Exception) extends Control | |
} | |
// ================================================================================================ | |
object Main { | |
import WebSockEcho._ | |
implicit val system = akka.actor.typed.ActorSystem[Put](behaviorPut(), "MySystem") | |
implicit val executionContext = system.executionContext | |
// ----------------------------------------------------------------- | |
// typed actor based source, using the guardianBehavior as our put behavior to send data to the websocket | |
val (controlActorRef, outgoing) = | |
ActorSource.actorRef[Control]( | |
completionMatcher = {case ControlDone => }, | |
failureMatcher = {case ControlFailure(ex) => ex}, | |
bufferSize=20, | |
OverflowStrategy.fail | |
).preMaterialize() | |
system ! PutRegister(controlActorRef) | |
// ----------------------------------------------------------------- | |
// typed actor based sink, will receive data from the remote websocket | |
val incoming = | |
ActorSink.actorRef[Get]( | |
ref = system.systemActorOf(behaviorGet(), "truc"), | |
onCompleteMessage = GetFinished, | |
onFailureMessage = ex => GetFailure(ex) | |
) | |
// ----------------------------------------------------------------- | |
// websocket connect | |
val uri = s"ws://echo.websocket.org" | |
val remoteProcessingFlow = Http().webSocketClientFlow(WebSocketRequest(uri = uri)) | |
// ----------------------------------------------------------------- | |
// Build the graph outgoing ~> remoteProcessingFlow ~> incoming | |
val (upgradedResponse, closed) = | |
outgoing | |
.map{case ControlContent(content) =>TextMessage.Strict(content) } | |
.viaMat(remoteProcessingFlow)(Keep.right) | |
.map{case content => GetReceived(content.asTextMessage.getStrictText)} | |
.toMat(incoming)(Keep.both) | |
.run() | |
// ----------------------------------------------------------------- | |
val connected = upgradedResponse.flatMap { upgrade => | |
if (upgrade.response.status == StatusCodes.SwitchingProtocols) { | |
Future.successful(Done) | |
} else { | |
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") | |
} | |
} | |
// ----------------------------------------------------------------- | |
system ! PutTransmit("Hello world") | |
system ! PutTransmit("Cool Raoul") | |
system ! PutShutdown // We can not just finish immediately else we will exit before getting anything | |
// Do not exit before the future has completed ;) | |
def andWait():Unit = Await.result(system.whenTerminated, Duration.Inf) | |
} | |
Main.andWait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment