Skip to content

Instantly share code, notes, and snippets.

@nlinker
Forked from sadache/AA.md
Created February 4, 2013 00:28
Show Gist options
  • Save nlinker/4704368 to your computer and use it in GitHub Desktop.
Save nlinker/4704368 to your computer and use it in GitHub Desktop.

Is socket.push(bytes) all you need to program Realtime Web apps?

One of the goals of Play2 architecture is to provide a programming model for what is called Realtime Web Applications.

Realtime Web Applications

Realtime Web Applications are applications that make use of Websockets, Server Sent Events, Comet or other protocols offering/simulating an open socket between the browser and the server for continuous communication. Basically, these applications let users work with information as it is published - without having to periodically ping the service.

There are quite a few web frameworks that target the development of this type of application: but usually the solution is to simply provide an API that allows developers to push/receive messages from/to an open channel, something like:

   channel.push(message)
   //and
   channel.onMessage { ... }

Though this kind of API offers an opportunity to get started doing Realtime Web, it doesn't offer a programming model for dealing with the challenges encountered when programming with streams of data: including creating, adapting, manipulating, filtering and merging streams of data - as well as handling all of the related synchronization involved.

A Programming Model

Since development of Realtime Web Apps is mostly built around manipulation of streams of data, it is crucial to have a programming model which identifies clearly what a stream of data is, and defines composable components to deal with it.

It is obvious that the channel API above falls short when manipulating a rich set of streams of data - but even the classic InputStream/OutputStreams interfaces are not sufficient. Even ignoring concerns regarding their inefficient blocking runtime properties, they simply don't carry enough signals/information to enable the building of a rich stream manipulation API.

Play2 uses Iteratees together with Futures for dealing with streams of data, providing a very rich model for programming Realtime Web Applications.

A sample Realtime Web App

The goal of this text isn't to provide a detailed description of what Iteratees and pals are. Nevertheless I will go through a quick introduction, and then move onto the example illustrating a few aspects of how powerful this approach is.

  • An Iteratee[E,A] is an immutable interface that represents a consumer, it consumes chunks of data each of type E and eventually produces a computed value of type A. For example, Iteratee[String,Int] is an iteratee that consumes chunks of strings and eventually produces an Int (that could be, for instance, the count of characters in the passed chunks).

    An iteratee can choose to terminate before the EOF is sent from the stream, or it can wait for EOF before it terminates, returning the computed A value.

    You can compose different Iteratees together: which provides an opportunity for partitioning consuming logic into different parts.

  • An Enumerator[E] represents a stream that is pushing chunks of data of type E. For example, an Enumerator[String] is a stream of strings. Enumerators can be composed one after the other, or interleaved concurrently providing means of streams management.

  • An Enumeratee[From,To] is an adapter from a stream of Froms to a stream of Tos. Note that an Enumeratee can rechunk differently, add or remove chunks or parts of them. Enumeratees (as we will see) are instrumental for stream manipulation

  • There are also some convenience methods for creating different kinds of Enumerators Iteratees and Enumeratees - useful in various scenarios.

Ok, so our sample application features two streams, one is a stream of financial operations:

  val operations: Enumerator[Event] = Enumerator.generateM[Event] {
    Promise.timeout(Some(Operation( if(Random.nextBoolean) "public" else "private", Random.nextInt(1000))), Random.nextInt(500))
  }

Here we are generating random values that will occur at random time frequencies (of maximum 500ms). In the real world this stream could be coming from a datastore or an open socket from another server.

An Operation is private or public and curries an amount of type Int:

case class Operation(level: String, amout: Int) extends Event

So our operations enumerator will supply a stream of operations - private or public, each with a random amount value. These values will be streamed at random time intervals.

The other stream in our example application is a stream of system messages: messages that talk about the status of the system:

case class SystemStatus(message: String) extends Event

val noise: Enumerator[Event] = Enumerator.generateM[Event] {
    Promise.timeout(Some(SystemStatus("System message")), Random.nextInt(5000))
  }

This stream can be coming from another server or datastore. With these two streams at hand, we can easily produce a single stream that contains messages of both by interleaving them:

val events: Enumerator[Event] = operations >- noise

For those not comfortable using symbolic operators (>- above), you could also use interleave method:

val events: Enumerator[Event] = operations.interleave(noise)

Now the model part of our application looks like Streams.scala, and we can move on to building the app.

Our sample Realtime Web App features:

Our application will publish this stream of Event as Server Sent Event or Comet (both are protocols for uni-directional socket from the server to the browser) and will be providing two simple features:

Authorization:

You can only see Events that are permitted for your role. Managers can see private operations and system status, whereas normal users can see only public operations. For this purpose we create an Enumeratee which collects the appropriate messages:

def feed(role: String) = Action {

  val secure: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
    case e@SystemStatus(_) if role == "MANAGER" => e
    case e@Operation("private", _) if role == "MANAGER" => e
    case e@Operation("public", _) => e
  }

The result is a new stream, secure that contains only the correct events for the given role.

Filtering:

For our example we'd like to filter the stream by "range of interest" based on the amount of the operation. By providing an upper and lower bounds you get only corresponding operations. For this we create another Enumeratee collecting appropriate operations:

def feed(role: String, lowerBound: Int, higherBound: Int) = Action {
    
  val secure: Enumeratee[Event, Event] = ...

  val inBounds: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
    case e@Operation(_, amout) if amout > lowerBound && amout < higherBound => e
    case e@SystemStatus(_) => e
  }

}

With Enumeratees you can easily create any kind of stream adapter you need for your application requirements.

JSON:

Our App will be pushing JSON messages to the browser, that's why we need one more Enumeratee for transforming Events to JSON values, ie, Enumeratee[Event,JsValue]. To do these, we map each stream input to the corresponding output:

  val asJson: Enumeratee[Event, JsValue] = Enumeratee.map[Event] { 
    case Operation(visibility, amount) => toJson(Map("type" -> toJson("operation"), "amount" -> toJson(amount), "visibility" -> toJson(visibility)))
    case SystemStatus(msg) => toJson(Map("type" -> "status", "message" -> msg))
  }

Ok, now we have three separate enumeratees that will handle the application logic. For convenience, let's produce one single adapter out of the three. For that we can use the compose method or its symbolic equivalent ><>:

    val finalAdapter = secure ><> inBounds ><> asJson

We're almost done. Now all we need is to respond to the browser with an Ok status, wrapping each message into the Server Sent Event (EventSource) protocol. Thankfully, there is already an Enumeratee which adapts a stream and wraps its chunks into the SSE protocol:

Ok.feed(Stream.events &> finalAdpater ><> EventSource()).as("text/event-stream")

Here we pass our stream through the finalAdapter and then through the EventSource adapter, applying then the appropriate header to the response.

Our application now looks like Application.scala

All we need now from the client side is to connect to the stream using the following javascript:

 feed = new EventSource('/feed?role=@role&lower=' + min + '&higher=' + max)

Snapshots of our Realtime Web App:

Bottom Line

Realtime Web involves dealing with different streams of data from different sources. It is hard to do any non-trivial application without having a programming model that contains an appropriate representation of a stream, and having the necessary API for creating, adapting, filtering and consuming streams of data. Play2 uses Iteratees to offer this programming model and a rich API.

Note: Full working application source was created by me @sadache and @guillaumebort for one of our talks

Note: Examples of Realtime Web Apps using Play2 Typesafe Console lichess.org

package controllers
import play.api._
import play.api.mvc._
import play.api.libs._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import play.api.libs.json.Json._
import models._
object Application extends Controller {
def index(role: String) = Action {
Ok(views.html.index(role))
}
def feed(role: String, lowerBound: Int, higherBound: Int) = Action {
val secure: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
case e@SystemStatus(_) if role == "MANAGER" => e
case e@Operation("private", _) if role == "MANAGER" => e
case e@Operation("public", _) => e
}
val inBounds: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
case e@Operation(_, amout) if amout > lowerBound && amout < higherBound => e
case e@SystemStatus(_) => e
}
val asJson: Enumeratee[Event, JsValue] = Enumeratee.map[Event] {
case Operation(visibility, amount) => toJson(Map("type" -> toJson("operation"), "amount" -> toJson(amount), "visibility" -> toJson(visibility)))
case SystemStatus(msg) => toJson(Map("type" -> "status", "message" -> msg))
}
val finalAdpater = secure ><> inBounds ><> asJson
Ok.feed(Stream.events &> finalAdpater ><> EventSource()).as("text/event-stream")
}
}
package models
trait Event
case class Operation(level: String, amout: Int) extends Event
case class SystemStatus(message: String) extends Event
object Stream {
import scala.util.Random
import play.api.libs.iteratee._
import play.api.libs.concurrent._
val operations: Enumerator[Event] = Enumerator.generateM[Event] {
Promise.timeout(Some(Operation( if(Random.nextBoolean) "public" else "private", Random.nextInt(1000))), Random.nextInt(500))
}
val noise: Enumerator[Event] = Enumerator.generateM[Event] {
Promise.timeout(Some(SystemStatus("System message")), Random.nextInt(5000))
}
val events: Enumerator[Event] = operations >- noise
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment