Last active
September 19, 2017 03:50
-
-
Save whiteinge/997d2be18f51637340dc to your computer and use it in GitHub Desktop.
POC Node HTTP server for server-sent-events using RxJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
Proof-of-concept server-sent events HTTP server using Node.js and RxJS | |
Open http://localhost:8000 in a browser and view the console. | |
**/ | |
var http = require('http'), | |
https = require('https'); | |
var Rx = require('rx'); | |
// -- Main Application Lifecycle ---------------------------------------------- | |
var requests$ = createServer(8000).share(); | |
var stream = new Rx.Subject(); | |
var clients = trackActiveClients(stream); | |
var server = requests$ | |
.filter(function(x) { return x.req.url === '/events'; }) | |
.do(function(data) { | |
stream.isStopped ? endSSEHeaders(data) : setSSEHeaders(data); | |
}) | |
.subscribe(clients); | |
process.on('SIGINT', function() { | |
// End the stream; notify all active connections. | |
stream.onCompleted(); | |
// Stop the server. | |
server.dispose(); | |
// Exit. | |
process.exit(); | |
}); | |
// -- Example of writing things to the stream --------------------------------- | |
// Push an arbitrary message through. | |
Rx.Observable.interval(2500) | |
.take(1) | |
.subscribe(function() { | |
stream.onNext('Got here.'); | |
}); | |
// Push a counter through the stream every second. | |
Rx.Observable.interval(1000) | |
.subscribe(stream); | |
// Make three Ajax request every ten seconds, push the response through the | |
// stream, then end the stream. | |
Rx.Observable.interval(10000) | |
.take(3) | |
.flatMap(function() { | |
return getJSON({ | |
method: 'GET', | |
hostname: 'api.github.com', | |
path: '/users', | |
headers: {'User-Agent' : 'NodeJS'}, | |
}); | |
}) | |
.subscribe(stream); | |
// -- Output HTML for demoing ------------------------------------------------- | |
requests$ | |
.filter(function(x) { return x.req.url === '/'; }) | |
.subscribe(function(data) { | |
var req = data.req, | |
resp = data.resp; | |
var body = [ | |
'<!doctype html>', | |
'<html>', | |
'<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>', | |
'<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-dom/7.0.3/rx.dom.js"></script>', | |
'<script>', | |
'var source = Rx.DOM.fromEventSource("/events");', | |
'source.map(JSON.parse).subscribe(', | |
'function(x) { console.log("x", x); },', | |
'function() { console.log("Stream complete."); });', | |
'</script>', | |
'</html>', | |
]; | |
resp.writeHead(200, {'Content-Type': 'text/html'}); | |
resp.write(body.join('\n')); | |
resp.end(); | |
}); | |
// -- Utility Functions ------------------------------------------------------- | |
/** | |
Write the initial SSE headers | |
**/ | |
function setSSEHeaders(data) { | |
var resp = data.resp; | |
resp.writeHead(200, { | |
'Content-Type': 'text/event-stream', | |
'Cache-Control': 'no-cache', | |
'Connection': 'keep-alive', | |
}); | |
resp.write('\n'); | |
} | |
/** | |
Cancel the stream by sending a 404 | |
SSE doesn't seem to have a concept of ending the stream. | |
**/ | |
function endSSEHeaders(data) { | |
var resp = data.resp; | |
resp.writeHead(204); | |
resp.end(); | |
} | |
/** | |
Set up a subscription to an SSE stream for each new HTTP request | |
Collects active HTTP clients into a CompositeDisposable so they can be viewed | |
or disposed as a single entity if needed. | |
**/ | |
function trackActiveClients(sseStream) { | |
var activeClients = new Rx.CompositeDisposable(); | |
subscribeNewClients.clients = activeClients; | |
return subscribeNewClients; | |
function subscribeNewClients(data) { | |
var req = data.req, | |
resp = data.resp; | |
var sub = sseStream | |
.map(formatSSE) | |
.subscribe( | |
resp.write.bind(resp), | |
function() {}, // swallow errors in the SSE stream. | |
resp.end.bind(resp)); | |
activeClients.add(sub); | |
// Dispose individual subscription if the client closes the connection. | |
req.on('close', function() { | |
activeClients.remove(sub); | |
sub.dispose(); | |
}); | |
} | |
} | |
/** | |
Format a message as an SSE stream item | |
Message must be JSON serializable. | |
**/ | |
function formatSSE(msg) { | |
return 'data: '+ JSON.stringify(msg) +'\n\n'; | |
} | |
/** | |
Create an observable of server requests | |
**/ | |
function createServer(port) { | |
return Rx.Observable.create(function(observer) { | |
var server = http.createServer(function(req, resp) { | |
observer.onNext({req: req, resp: resp}); | |
}); | |
server.listen(port); | |
// Disposing the server observable will stop the server. | |
return function() { | |
observer.onCompleted(); | |
server.close(); | |
}; | |
}); | |
} | |
/** | |
Bring a modicum of sanity to Node's ajax support... | |
**/ | |
function getJSON(options) { | |
return Rx.Observable.create(function(observer) { | |
var req = https.request(options, function(resp) { | |
var rawData = ''; | |
resp.on('data', function (chunk){ rawData += chunk; }); | |
resp.on('end',function(){ | |
var data; | |
try { | |
data = JSON.parse(rawData); | |
} catch (err) { | |
observer.onError(err); | |
} | |
observer.onNext(data); | |
observer.onCompleted(); | |
}); | |
}); | |
req.on('error', function(err) { observer.onError(err); }); | |
req.end() | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "node-sse-rxjs-server", | |
"version": "1.0.0", | |
"description": "An SSE server using Node and RxJS", | |
"main": "index.js", | |
"scripts": { | |
"start": "node ./index.js" | |
}, | |
"author": "Seth House", | |
"license": "BSD-2-Clause", | |
"dependencies": { | |
"rx": "^4.0.7" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment