Last active
May 25, 2024 08:38
-
-
Save dacr/382c52248d714cef572ac5f0acbd9ec2 to your computer and use it in GitHub Desktop.
Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout / published by https://github.com/dacr/code-examples-manager #57415e71-093a-4470-8c09-e80426ddd438/799bca23696cbdaf2e19c4c0e863fe435453adbe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout | |
// keywords : scala, actors, akka, http-client, client, websocket, cat | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 57415e71-093a-4470-8c09-e80426ddd438 | |
// created-on : 2021-02-05T18:06:03Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// usage-example : scala-cli akka-wscat.sc -- ws://127.0.0.1:8080 | |
// --------------------- | |
//> using scala "2.13.14" | |
//> using objectWrapper | |
//> using dep "com.typesafe.akka::akka-http:10.2.10" | |
//> using dep "com.typesafe.akka::akka-stream-typed:2.6.21" | |
//> using dep "com.typesafe.akka::akka-actor-typed:2.6.21" | |
////> using dep "org.slf4j:slf4j-nop:2.0.7" | |
//> using dep "org.slf4j:slf4j-simple:2.0.7" | |
// --------------------- | |
import akka.actor.typed.{ActorRef, Behavior} | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.Done | |
import akka.stream.scaladsl._ | |
import akka.stream._ | |
import akka.stream.typed.scaladsl.ActorSource | |
import akka.http.scaladsl._ | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, WebSocketRequest} | |
import scala.annotation.tailrec | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.io.StdIn | |
object Protocol { | |
// ------------------------------------------------------------------------------------------- | |
// Actor behavior to manage input data | |
sealed trait Send | |
case class SendRegister(controlActorRef: ActorRef[Control]) extends Send | |
case class SendTransmit(content: String) extends Send | |
object SendFinished extends Send | |
def behaviorSend(): Behavior[Send] = Behaviors.receiveMessage { | |
case SendRegister(controlActorRef) => | |
Behaviors.receiveMessage { | |
case SendTransmit(content) => | |
controlActorRef ! ControlContent(content) | |
Behaviors.same | |
case SendFinished => | |
controlActorRef ! ControlDone | |
Behaviors.stopped // Means shutdown the actor system here ! | |
} | |
} | |
// ------------------------------------------------------------------------------------------- | |
// To talk with the created Source actor | |
sealed trait Control | |
object ControlDone extends Control | |
case class ControlContent(content:String) extends Control | |
case class ControlFailure(ex:Exception) extends Control | |
} | |
val onlineWebSocketEchoServices = List( | |
"wss://demo.piesocket.com", | |
//"wss://socketsbay.com/wss/v2/1/demo/", // no echo response :( | |
//"ws://echo.websocket.org" // decommissioned on 2021 ! | |
) | |
import Protocol._ | |
implicit val system = akka.actor.typed.ActorSystem[Send](behaviorSend(), "MySystem") | |
implicit val executionContext = system.executionContext | |
val uri = args.headOption.getOrElse(onlineWebSocketEchoServices.head) | |
// bufferSize & overflowStrategy are important to deal with fast input rate | |
// With stdIn no backpressure is possible I guess | |
val (controlActorRef, outgoing) = | |
ActorSource.actorRef[Control]( | |
completionMatcher = {case ControlDone => }, | |
failureMatcher = {case ControlFailure(ex) => ex}, | |
bufferSize=10000, | |
overflowStrategy = OverflowStrategy.dropNew | |
).preMaterialize() | |
system ! SendRegister(controlActorRef) | |
val incoming: Sink[Message, Future[Done]] = | |
Sink.foreach[Message] { | |
case TextMessage.Strict(text) => | |
println(text) | |
case TextMessage.Streamed(stream) => | |
val text = stream.runReduce(_ + _) // Force consume and concat all responses fragments | |
text.map(System.out.println).andThen(_ => System.out.flush) // TODO better handling of this future ? | |
case BinaryMessage.Strict(bin) => | |
println("Strict binary message not supported ") | |
case BinaryMessage.Streamed(bin) => | |
println("Streamed binary message not supported ") | |
bin.runWith(Sink.ignore) // Force consume (to free input stream) | |
case x => | |
println(s"Not understood entry $x") | |
} | |
println(s"Request websocket from $uri") | |
val flow = Http().webSocketClientFlow(request = WebSocketRequest(uri = uri)) | |
val (upgradedResponse, closed) = | |
outgoing | |
.map{case ControlContent(content) => TextMessage(content) } | |
.viaMat(flow)(Keep.right) | |
.toMat(incoming)(Keep.both) | |
.run() | |
val connected = upgradedResponse.flatMap { upgrade => | |
if (upgrade.response.status == StatusCodes.SwitchingProtocols) { | |
Future.successful(Done) | |
} else { | |
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") | |
} | |
} | |
@tailrec | |
def loop(): Unit = { | |
val input = Option(StdIn.readLine()) // returns null for EOF so NONE in our case | |
input match { | |
case Some(content) => | |
system ! SendTransmit(content+"\n") | |
loop() | |
case None => system ! SendFinished | |
} | |
} | |
new Thread() { | |
override def run(): Unit = { | |
loop() | |
} | |
}.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment