Skip to content

Instantly share code, notes, and snippets.

@hden
Created March 15, 2013 13:38
Show Gist options
  • Select an option

  • Save hden/5169915 to your computer and use it in GitHub Desktop.

Select an option

Save hden/5169915 to your computer and use it in GitHub Desktop.

A Common Interface Among Streaming APIs

After reading the slide from Isaac's talk in November. I began to think about using little reactive controllers wired together to control the overall resposes of an application.

To demonstreate this concept. Say let's write a chat server that accept HTTP POST as input, and respond to GET with current char content.

Note that connection: 'keep-alive' may be one of the prerequisite for this to work.

# Server
{Bacon} = require 'Bacon'
server = require('http')
  .createServer()
  .listen(80)
request = Bacon.fromEventTarget server, 'request'
chatRoom = request
  .filter((req) -> req.method is 'POST')
  .flatMap (req, res) ->
    # probably need to wait until data ends, but
    # for the sake of simplicity just use
    req.asEventStream('data') # demonstreating the .asEventStream API

The chat room can also be written as a Bus

chatRoom = new Bacon.Bus() 
request.filter((req) -> req.method is 'POST')
  .flatMap (req, res) ->
    req.pipe(chatRoom)

To add listeners, we simply pipe a new readable stream to it.

get = request
  .filter((req) -> req.method is 'GET')
  .flatMap (req, res) ->
    # demonstreating the pipe API
    chatRoom.scan('', concat).pipe(res)

The FRP architecture give us increadible extensibility. For example, to log anyone use bad languages in our chat room, we can simply scan the event stream.

logs = fs.createWriteStream path, {encoding: 'utf-8'}
chatRoom.scan('', badWordFilter).pipe(logs)

As far as I'm aware, there are three popular technology that have data/event stream in its core.

Each works great individually, but they suck when pipe'd together.

Note: There is also Progressive XHR, but it is not supported in all browsers.

Node Stream

  • It have the concept of water mark, thus is capable of automatically pausing and resuming data flow in response of a occupied IO.
  • Data agnostic. Data arrives in random-sized pieces. It does not care about the minimal meaningful chunk of data. One probably have to implement his own transforming stream to achieve that.
  • Linear. It works on the basis that A -> B -> C. But what if we need is this?

Complicated stream

Bacon.js

  • High-level stream manipulatablity. Split, merge, map, etc.
  • Works great with front-end API. e.g. jQuery
  • Does not aware of the processing capacity of a subscriber, thus no way to pause event propogation.
  • Not compatable with other stream API.

WebSockets

  • Connects server and client, hense event propogation is made possible.
  • Works in most browsers through socket.io.
  • Not compatable with other stream API.

There are prior arts trying to link these APIs, but fails to gain traction.

The Ideal Setup

What we need is

  • Normalized API.
  • A easily manipulatable (fork, merge, plug, etc) stream.
  • A linear stream that is aware of the processing capability of its upstream and downstream.
  • Embrace the open web.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment