Skip to content

Instantly share code, notes, and snippets.

@kamilkloch
Last active September 10, 2021 10:05
Show Gist options
  • Save kamilkloch/0b9023f5d9d32b9a2b8ae92a847aa5c5 to your computer and use it in GitHub Desktop.
Save kamilkloch/0b9023f5d9d32b9a2b8ae92a847aa5c5 to your computer and use it in GitHub Desktop.
import cats.effect.{IO, IOApp}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.Router
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3._
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import sttp.tapir._
import sttp.tapir.client.sttp.SttpClientInterpreter
import sttp.tapir.client.sttp.ws.fs2._
import sttp.tapir.server.http4s.Http4sServerInterpreter
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
object TapirWebSockets extends IOApp.Simple {
private val log = Logger[this.type]
private val wsEndpoint = endpoint.get
.in("ping")
.out(webSocketBody[Int, CodecFormat.TextPlain, Int, CodecFormat.TextPlain](Fs2Streams[IO]))
private val wsRoutes = Http4sServerInterpreter[IO]().toRoutes(wsEndpoint)(_ => IO.pure(Right(identity)))
private val requestStream = Stream.iterate(1)(_ + 1).covary[IO].evalTap(_ => IO.sleep(10.millis))
private val wsClient = AsyncHttpClientFs2Backend.resource[IO]().use { backend =>
SttpClientInterpreter().toClientThrowErrors(wsEndpoint, Some(uri"ws://localhost:8080"), backend).apply(()).flatMap { wsPipe =>
requestStream
.evalTap(x => IO(log.info(s"Sending $x")))
.through(wsPipe)
.evalTap(x => IO(log.info(s"Received $x")))
.compile
.drain
}
}
def run: IO[Unit] = {
BlazeServerBuilder[IO](ExecutionContext.global)
.bindHttp(8080, "localhost")
.withHttpApp(Router("/" -> wsRoutes).orNotFound)
.resource
.use(_ => wsClient)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment