Skip to content

Instantly share code, notes, and snippets.

@whiteinge
Last active September 19, 2017 03:50
Show Gist options
  • Save whiteinge/997d2be18f51637340dc to your computer and use it in GitHub Desktop.
Save whiteinge/997d2be18f51637340dc to your computer and use it in GitHub Desktop.
POC Node HTTP server for server-sent-events using RxJS
/**
Proof-of-concept server-sent events HTTP server using Node.js and RxJS
Open http://localhost:8000 in a browser and view the console.
**/
var http = require('http'),
https = require('https');
var Rx = require('rx');
// -- Main Application Lifecycle ----------------------------------------------
var requests$ = createServer(8000).share();
var stream = new Rx.Subject();
var clients = trackActiveClients(stream);
var server = requests$
.filter(function(x) { return x.req.url === '/events'; })
.do(function(data) {
stream.isStopped ? endSSEHeaders(data) : setSSEHeaders(data);
})
.subscribe(clients);
process.on('SIGINT', function() {
// End the stream; notify all active connections.
stream.onCompleted();
// Stop the server.
server.dispose();
// Exit.
process.exit();
});
// -- Example of writing things to the stream ---------------------------------
// Push an arbitrary message through.
Rx.Observable.interval(2500)
.take(1)
.subscribe(function() {
stream.onNext('Got here.');
});
// Push a counter through the stream every second.
Rx.Observable.interval(1000)
.subscribe(stream);
// Make three Ajax request every ten seconds, push the response through the
// stream, then end the stream.
Rx.Observable.interval(10000)
.take(3)
.flatMap(function() {
return getJSON({
method: 'GET',
hostname: 'api.github.com',
path: '/users',
headers: {'User-Agent' : 'NodeJS'},
});
})
.subscribe(stream);
// -- Output HTML for demoing -------------------------------------------------
requests$
.filter(function(x) { return x.req.url === '/'; })
.subscribe(function(data) {
var req = data.req,
resp = data.resp;
var body = [
'<!doctype html>',
'<html>',
'<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>',
'<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-dom/7.0.3/rx.dom.js"></script>',
'<script>',
'var source = Rx.DOM.fromEventSource("/events");',
'source.map(JSON.parse).subscribe(',
'function(x) { console.log("x", x); },',
'function() { console.log("Stream complete."); });',
'</script>',
'</html>',
];
resp.writeHead(200, {'Content-Type': 'text/html'});
resp.write(body.join('\n'));
resp.end();
});
// -- Utility Functions -------------------------------------------------------
/**
Write the initial SSE headers
**/
function setSSEHeaders(data) {
var resp = data.resp;
resp.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
resp.write('\n');
}
/**
Cancel the stream by sending a 404
SSE doesn't seem to have a concept of ending the stream.
**/
function endSSEHeaders(data) {
var resp = data.resp;
resp.writeHead(204);
resp.end();
}
/**
Set up a subscription to an SSE stream for each new HTTP request
Collects active HTTP clients into a CompositeDisposable so they can be viewed
or disposed as a single entity if needed.
**/
function trackActiveClients(sseStream) {
var activeClients = new Rx.CompositeDisposable();
subscribeNewClients.clients = activeClients;
return subscribeNewClients;
function subscribeNewClients(data) {
var req = data.req,
resp = data.resp;
var sub = sseStream
.map(formatSSE)
.subscribe(
resp.write.bind(resp),
function() {}, // swallow errors in the SSE stream.
resp.end.bind(resp));
activeClients.add(sub);
// Dispose individual subscription if the client closes the connection.
req.on('close', function() {
activeClients.remove(sub);
sub.dispose();
});
}
}
/**
Format a message as an SSE stream item
Message must be JSON serializable.
**/
function formatSSE(msg) {
return 'data: '+ JSON.stringify(msg) +'\n\n';
}
/**
Create an observable of server requests
**/
function createServer(port) {
return Rx.Observable.create(function(observer) {
var server = http.createServer(function(req, resp) {
observer.onNext({req: req, resp: resp});
});
server.listen(port);
// Disposing the server observable will stop the server.
return function() {
observer.onCompleted();
server.close();
};
});
}
/**
Bring a modicum of sanity to Node's ajax support...
**/
function getJSON(options) {
return Rx.Observable.create(function(observer) {
var req = https.request(options, function(resp) {
var rawData = '';
resp.on('data', function (chunk){ rawData += chunk; });
resp.on('end',function(){
var data;
try {
data = JSON.parse(rawData);
} catch (err) {
observer.onError(err);
}
observer.onNext(data);
observer.onCompleted();
});
});
req.on('error', function(err) { observer.onError(err); });
req.end()
});
}
{
"name": "node-sse-rxjs-server",
"version": "1.0.0",
"description": "An SSE server using Node and RxJS",
"main": "index.js",
"scripts": {
"start": "node ./index.js"
},
"author": "Seth House",
"license": "BSD-2-Clause",
"dependencies": {
"rx": "^4.0.7"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment