Last active
October 15, 2019 16:24
-
-
Save adamw/215aacc452e49304f3c182d99ef49f9f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sttp.client.akkahttp | |
import akka.Done | |
import akka.actor.{ActorSystem, Cancellable} | |
import akka.http.scaladsl.model.ws.{Message, TextMessage} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} | |
import akka.util.ByteString | |
import sttp.client._ | |
import sttp.client.ws.WebSocketResponse | |
import scala.concurrent.duration._ | |
import scala.concurrent.Future | |
object AkkaWebsocketExample extends App { | |
// setting up akka, the threadpool, etc. | |
implicit val system: ActorSystem = ActorSystem() | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
implicit val backend: SttpBackend[Future, Source[ByteString, Any], | |
Flow[Message, Message, *]] = | |
AkkaHttpBackend.usingActorSystem(system) | |
import system.dispatcher | |
// creating a sink, which prints all incoming messages to the console | |
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] { | |
case m: TextMessage => | |
m.toStrict(1.second).foreach(s => println(s"RECEIVED: $s")) | |
case _ => | |
} | |
// creating a source, which produces a new text message each second | |
val source: Source[Message, Cancellable] = Source | |
.tick(1.second, 1.second, ()) | |
.map(_ => TextMessage("Hello!")) | |
// combining the sink & source into a flow; the sink and source are | |
// disconnected and operate independently | |
val flow: Flow[Message, Message, Future[Done]] = | |
Flow.fromSinkAndSourceMat(sink, source)(Keep.left) | |
// using a test websocket endpoint | |
val response: Future[WebSocketResponse[Future[Done]]] = | |
basicRequest.get(uri"wss://echo.websocket.org").openWebsocket(flow) | |
// the "response" will be completed once the websocket is established | |
response.foreach { r => | |
println("Websocket established!") | |
// The Future[Done] comes from the sink, and in this case will be completed | |
// once the server closes the connection. | |
r.result.foreach { _ => | |
println("Websocket closed!") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment