Skip to content

Instantly share code, notes, and snippets.

@schmitch
Last active September 11, 2016 19:27
Show Gist options
  • Save schmitch/8474d9fe4b0bd39849a026f437a52617 to your computer and use it in GitHub Desktop.
Save schmitch/8474d9fe4b0bd39849a026f437a52617 to your computer and use it in GitHub Desktop.
Playframework WebSockets with a Queue
import javax.inject.{ Inject, Provider, Singleton }
import akka.NotUsed
import akka.stream.scaladsl.{ Keep, Sink, Source, SourceQueueWithComplete }
import akka.stream.{ Materializer, OverflowStrategy }
import play.api.http.websocket.Message
import play.api.inject.{ Binding, Module }
import play.api.{ Configuration, Environment }
case class ConcurrentChannel(channel: SourceQueueWithComplete[Message], source: Source[Message, NotUsed])
@Singleton
class ConcurrentChannelProvider @Inject()()(implicit mat: Materializer) extends Provider[ConcurrentChannel] {
private val (channel, publisher) = Source.queue[Message](100, OverflowStrategy.backpressure)
.toMat(Sink.asPublisher(true))(Keep.both).run()
private val _ = Source.fromPublisher(publisher).runWith(Sink.foreach(println))
def get(): ConcurrentChannel = ConcurrentChannel(channel, Source.fromPublisher(publisher))
}
class ConcurrentChanneModule extends Module {
override def bindings(environment: Environment, configuration: Configuration): Seq[Binding[_]] = Seq(
bind[ConcurrentChannel].toProvider[ConcurrentChannelProvider].eagerly()
)
}
// Usage
class DemoController @Inject() (concurrentChannel: Provider[ConcurrentChannel]) extends Controller {
def offer = Action { implicit request => concurrentChannel.get().channel.offer(TextMessage("Hello!")) Ok }
def fail = Action { implicit request => concurrentChannel.get().channel.fail(new Exception("WS ERROR")) Ok }
// Close all Connections that had a Queue as the Backing store:
def close = Action { implicit request => concurrentChannel.get().channel.complete() Ok }
def socket = WebSocket.accept[Message, Message] { r =>
Flow.fromSinkAndSource(Sink.ignore, concurrentChannel.get().source)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment