Skip to content

Instantly share code, notes, and snippets.

@adamw
Created May 10, 2024 17:01
Show Gist options
  • Save adamw/8eba6ae90dd497bcddccc55e7083c50d to your computer and use it in GitHub Desktop.
Save adamw/8eba6ae90dd497bcddccc55e7083c50d to your computer and use it in GitHub Desktop.
//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-sync:1.10.7
import ox.channels.{Actor, ActorRef, Channel, ChannelClosed, Default, DefaultResult, selectOrClosed}
import ox.{fork, releaseAfterScope, supervised}
import sttp.tapir.*
import sttp.tapir.CodecFormat.*
import sttp.tapir.server.netty.sync.{Id, NettySyncServer, OxStreams}
import java.util.UUID
type ChatMemberId = UUID
case class ChatMember(id: ChatMemberId, channel: Channel[Message])
object ChatMember:
def create: ChatMember = ChatMember(UUID.randomUUID(), Channel.bufferedDefault[Message])
class ChatRoom:
private var members: Map[ChatMemberId, ChatMember] = Map()
def connected(m: ChatMember): Unit =
members = members + (m.id -> m)
println(s"Connected: ${m.id}, number of members: ${members.size}")
def disconnected(m: ChatMember): Unit =
members = members - m.id
println(s"Disconnected: ${m.id}, number of members: ${members.size}")
def incoming(message: Message): Unit =
println(s"Broadcasting: ${message.v}")
members = members.flatMap { (id, member) =>
selectOrClosed(member.channel.sendClause(message), Default(())) match
case member.channel.Sent() => Some((id, member))
case _: ChannelClosed =>
println(s"Channel of member $id closed, removing from members")
None
case DefaultResult(_) =>
println(s"Buffer for member $id full, not sending message")
Some((id, member))
}
//
case class Message(v: String) // could be more complex, e.g. JSON including nickname + message
given Codec[String, Message, TextPlain] = Codec.string.map(Message(_))(_.v)
val chatEndpoint = endpoint.get
.in("chat")
.out(webSocketBody[Message, TextPlain, Message, TextPlain](OxStreams))
def chatProcessor(a: ActorRef[ChatRoom]): OxStreams.Pipe[Message, Message] =
incoming => {
val member = ChatMember.create
a.tell(_.connected(member))
fork {
incoming.foreach { msg =>
a.tell(_.incoming(msg))
}
}
releaseAfterScope {
member.channel.done()
a.tell(_.disconnected(member))
}
member.channel
}
@main def chatWsServer(): Unit =
supervised {
val chatActor = Actor.create(new ChatRoom)
val chatServerEndpoint = chatEndpoint.serverLogicSuccess[Id](_ => chatProcessor(chatActor))
NettySyncServer().addEndpoint(chatServerEndpoint).startAndWait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment