Skip to content

Instantly share code, notes, and snippets.

@mathieuancelin
Created August 23, 2013 09:56
Show Gist options
  • Save mathieuancelin/6317564 to your computer and use it in GitHub Desktop.
Save mathieuancelin/6317564 to your computer and use it in GitHub Desktop.
package controllers
import play.api.mvc.{Action, Controller}
import play.api.libs.iteratee.{Concurrent, Enumeratee}
import play.api.libs.json.{Json, JsValue}
import play.api.libs.EventSource
import play.api.libs.ws.WSEnumerator
import models._
import play.api.libs.concurrent.Execution.Implicits._
object StocksApplication extends Controller {
lazy val nyEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/ny")(Json.parse(_))
lazy val londonEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/ldn")(Json.parse(_))
lazy val tokyoEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/tok")(Json.parse(_))
def index(role: String) = Action {
Ok(views.html.stocks.index(role))
}
def userEvent(role: String, lowerBound: Int, higherBound: Int) = Action.async {
val secure: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
case e@SystemStatus(_, _, _) if role == "MANAGER" => e
case e@Operation("private", _, _, _, _) if role == "MANAGER" => e
case e@Operation("public", _, _, _, _) => e
}
val inBounds = Enumeratee.collect[Event] {
case e@Operation(_, amout, _, _, _) if amout > lowerBound && amout < higherBound => e.asInstanceOf[Event]
case e@SystemStatus(_, _, _) => e.asInstanceOf[Event]
}
for {
ny <- nyEnumerator
london <- londonEnumerator
tokyo <- tokyoEnumerator
} yield Ok.feed(
Concurrent.broadcast(ny >- london >- tokyo)._1 &> EventUtils.decoder &> secure &> inBounds &> EventUtils.encoder &> EventSource()
).as("text/event-stream")
}
}
package models
import play.api.libs.json.{JsUndefined, JsValue, Json}
import play.api.libs.iteratee.Enumeratee
import play.api.libs.concurrent.Execution.Implicits._
trait Event
case class Operation(level: String, amount: Int, from: String, customer: String, timestamp: Long) extends Event
case class SystemStatus(message: String, typeOf: String, timestamp: Long) extends Event
object EventUtils {
implicit val OperationFmt = Json.format[Operation]
implicit val SystemStatusFmt = Json.format[SystemStatus]
lazy val decoder: Enumeratee[JsValue, Event] = Enumeratee.map[JsValue] { jsValue =>
(jsValue \ "level") match {
case _: JsUndefined => SystemStatusFmt.reads(jsValue).get
case _ => OperationFmt.reads(jsValue).get
}
}
lazy val encoder: Enumeratee[Event, JsValue] = Enumeratee.map[Event] {
case o: Operation => OperationFmt.writes(o)
case s: SystemStatus => SystemStatusFmt.writes(s)
}
}
package controllers
import play.api.mvc.{Controller, Action}
import play.api.libs.iteratee.{Enumerator}
import play.api.libs.concurrent.Execution.Implicits._
import models._
import play.api.libs.concurrent.Promise
import scala.util.Random
import java.util.UUID
object StocksWS extends Controller {
def createFakeFeed(from: String) = {
val operations: Enumerator[Event] = Enumerator.generateM[Event] {
Promise.timeout(
Some(
Operation(
if(Random.nextBoolean) "public" else "private",
Random.nextInt(1000),
from,
UUID.randomUUID().toString,
System.currentTimeMillis()
)
), Random.nextInt(500) + Random.nextInt(300))
}
val noise: Enumerator[Event] = Enumerator.generateM[Event] {
Promise.timeout(
Some(
SystemStatus(
s"System message from $from",
if(Random.nextBoolean) "ERROR" else "FAILURE",
System.currentTimeMillis()
)
), Random.nextInt(5000))
}
val nyEnumerator: Enumerator[Event] = operations >- noise
Ok.chunked(nyEnumerator.through(EventUtils.encoder))
}
def ny = Action {
createFakeFeed("New-York")
}
def london = Action {
createFakeFeed("London")
}
def tokyo = Action {
createFakeFeed("Tokyo")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment