Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active June 28, 2025 14:00
Show Gist options
  • Save calvinlfer/46c603d35cd5d78c3065077d46bf0e53 to your computer and use it in GitHub Desktop.
Save calvinlfer/46c603d35cd5d78c3065077d46bf0e53 to your computer and use it in GitHub Desktop.
ZIO HTTP Websocket example: simplified chat room
//> using dep dev.zio::zio:2.1.19
//> using dep dev.zio::zio-interop-cats::23.1.0.5
//> using dep dev.zio::zio-http:3.3.3
import zio.http.*
import zio.*
import zio.stream.*
import zio.http.WebSocketFrame.*
final class ChatRoom(in: Queue[WebSocketFrame], out: Hub[WebSocketChannelEvent]):
private val socket = Handler.webSocket: channel =>
val recv: Task[Unit] =
channel.receiveAll:
case ChannelEvent.Read(t: WebSocketFrame.Text) => in.offer(t)
case ChannelEvent.ExceptionCaught(e) => ZIO.logErrorCause("Exception caught", Cause.fail(e))
case ChannelEvent.Read(_: WebSocketFrame.Binary) => ZIO.logDebug("I don't understand binary")
case ChannelEvent.Read(_: WebSocketFrame.Close) => channel.shutdown
case ChannelEvent.Read(_: WebSocketFrame.Continuation) => ZIO.unit
case ChannelEvent.Read(Ping) => channel.send(ChannelEvent.read(Pong))
case ChannelEvent.Read(Pong) => ZIO.unit
case ChannelEvent.UserEventTriggered(event) => ZIO.logInfo(s"User event: $event")
case ChannelEvent.Registered => ZIO.logInfo("Hello")
case ChannelEvent.Unregistered => ZIO.logInfo("Bye bye") *> channel.shutdown
val snd: Task[Unit] =
ZStream.fromHub(out).tapChunks(channel.sendAll(_)).runDrain
recv.zipPar(snd)
val http: Route[Any, Nothing] = Method.GET / "subscriptions" -> handler(socket.toResponse)
object ChatRoom:
private def bridge(in: Queue[WebSocketFrame], out: Hub[WebSocketChannelEvent]) =
ZStream
.fromQueue(in)
.collect:
case WebSocketFrame.Text(text) => ChannelEvent.read(WebSocketFrame.text(text))
.tapChunks(out.publishAll(_))
.runDrain
val layer: ULayer[ChatRoom] = ZLayer.scoped:
for
in <- Queue.unbounded[WebSocketFrame]
out <- Hub.unbounded[WebSocketChannelEvent]
_ <- bridge(in, out).forkScoped
yield ChatRoom(in, out)
object ZioHttpPlayground extends ZIOAppDefault:
val run =
ZIO
.serviceWithZIO[ChatRoom]: room =>
val http = Routes(room.http)
Server.serve(http)
.provide(
ChatRoom.layer,
Server.defaultWithPort(8080)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment