Last active
September 11, 2016 19:27
-
-
Save schmitch/8474d9fe4b0bd39849a026f437a52617 to your computer and use it in GitHub Desktop.
Playframework WebSockets with a Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import javax.inject.{ Inject, Provider, Singleton } | |
import akka.NotUsed | |
import akka.stream.scaladsl.{ Keep, Sink, Source, SourceQueueWithComplete } | |
import akka.stream.{ Materializer, OverflowStrategy } | |
import play.api.http.websocket.Message | |
import play.api.inject.{ Binding, Module } | |
import play.api.{ Configuration, Environment } | |
case class ConcurrentChannel(channel: SourceQueueWithComplete[Message], source: Source[Message, NotUsed]) | |
@Singleton | |
class ConcurrentChannelProvider @Inject()()(implicit mat: Materializer) extends Provider[ConcurrentChannel] { | |
private val (channel, publisher) = Source.queue[Message](100, OverflowStrategy.backpressure) | |
.toMat(Sink.asPublisher(true))(Keep.both).run() | |
private val _ = Source.fromPublisher(publisher).runWith(Sink.foreach(println)) | |
def get(): ConcurrentChannel = ConcurrentChannel(channel, Source.fromPublisher(publisher)) | |
} | |
class ConcurrentChanneModule extends Module { | |
override def bindings(environment: Environment, configuration: Configuration): Seq[Binding[_]] = Seq( | |
bind[ConcurrentChannel].toProvider[ConcurrentChannelProvider].eagerly() | |
) | |
} | |
// Usage | |
class DemoController @Inject() (concurrentChannel: Provider[ConcurrentChannel]) extends Controller { | |
def offer = Action { implicit request => concurrentChannel.get().channel.offer(TextMessage("Hello!")) Ok } | |
def fail = Action { implicit request => concurrentChannel.get().channel.fail(new Exception("WS ERROR")) Ok } | |
// Close all Connections that had a Queue as the Backing store: | |
def close = Action { implicit request => concurrentChannel.get().channel.complete() Ok } | |
def socket = WebSocket.accept[Message, Message] { r => | |
Flow.fromSinkAndSource(Sink.ignore, concurrentChannel.get().source) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment