Skip to content

Instantly share code, notes, and snippets.

@Lasering
Last active May 29, 2017 23:16
Show Gist options
  • Save Lasering/ce9e137e96c93386ef125ae924b2f6db to your computer and use it in GitHub Desktop.
Save Lasering/ce9e137e96c93386ef125ae924b2f6db to your computer and use it in GitHub Desktop.
Http4s Websocket handleWithActor
package org.http4s.akka
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Queue
import scalaz.stream.{Exchange, Process, Sink, async}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated}
import org.http4s.server.websocket.{WS => Http4sWS}
import org.http4s.websocket.WebsocketBits.{Close, Text, WebSocketFrame}
import org.http4s.{Response, Status}
package object websocket {
private class ClientActor(props: Props, outQueue: Queue[WebSocketFrame]) extends Actor {
val serverActor = context.actorOf(props)
context watch serverActor
def receive: Receive = {
case Terminated(`serverActor`) =>
// The server actor stopped which means the user code wants to terminate the websocket,
// so we Close it and then close the queue.
outQueue.enqueueOne(Close()).unsafePerformSync
outQueue.close.unsafePerformSync
context stop self
case m if sender() == serverActor =>
outQueue.enqueueOne(Text(m.toString)).unsafePerformSync
case m if sender() == self =>
serverActor ! m
}
override def supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.Stop
}
}
object WS {
def handleWithActor[In, Out](props: Props, status: Task[Response] = Response(Status.NotImplemented).withBody("This is a WebSocket route."))
(implicit actorSystem: ActorSystem): Task[Response] = {
//TODO: get queue size from settings
val outQueue = async.boundedQueue[WebSocketFrame](100)
val clientActor = actorSystem.actorOf(Props(new ClientActor(props, outQueue)))
val sink: Sink[Task, WebSocketFrame] = Process.constant[WebSocketFrame => Task[Unit]] {
case Text(t, _) => Task {
// By setting the clientActor as the sender we inform the clientActor that
// the message was from the browser and it should be sent to the serverActor.
// This ruse allows us to send messages to the serverActor without
// having its ActorRef, which saves us from having to ask clientActor for it.
// Or having to resort to more complex tools.
clientActor.tell(t, clientActor)
}
} onComplete Process.eval_(Task {
clientActor ! PoisonPill
})
Http4sWS(Exchange(outQueue.dequeue, sink), status)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment