Skip to content

Instantly share code, notes, and snippets.

@KrishnaPG
Last active October 15, 2015 04:34
Show Gist options
  • Save KrishnaPG/3203606f42b6b7871ecf to your computer and use it in GitHub Desktop.
Save KrishnaPG/3203606f42b6b7871ecf to your computer and use it in GitHub Desktop.
/**
Copyright (c) 2015 My-Classes
http://my-classes.com/2015/10/15/singleton-websocket-publisher-with-autobahn-pubsub-crossbar-io-meta-events/
**/
var debug = autobahn.log.debug;
var globals = {};
globals.pubsubTopic = "pubsub-simulator";
globals.serverStartAttempt = 0;
// Set up WAMP connection to router
var connection = new autobahn.Connection({
url: 'wss://demo.crossbar.io/ws',
realm: 'realm1'
} // for demo crossbar server, only realm1 is supported !!
);
// Set up 'onopen' handler
connection.onopen = function (session) {
// start listening to publisher
session.subscribe(globals.pubsubTopic, onMessage);
// monitor session events and start singleton if required
monitorMetaEvents(session, attemptSingletonStart);
};
document.addEventListener('DOMContentLoaded', function () {
// start subscibers
connection.open();
});
// subscriber received a message
function onMessage(msg) {
console.log("Received from publisher:", msg);
}
// This is the singleton publisher method
function runSimulator(session) {
session.publish(globals.pubsubTopic, [{ 'timestamp': Date.now(), 'val': Math.random() }]);
setTimeout(runSimulator, 1000, session); // infinite loop
//TODO: control the publishing based on number of subscribers
}
// keeps track of meta events
function monitorMetaEvents(session, callback) {
if (!globals.singletonRID) callback(session);
var meta_topics = [
"wamp.registration.on_unregister",
// "wamp.registration.on_delete"
];
function onMetaEvent(args, kwargs) {
debug("onMetaEvent", args, kwargs);
if (args[1] === globals.singletonRID) callback(session);
}
// subscribe to "unregister" WAMP meta events
for (var i = 0; i < meta_topics.length; ++i) {
var topic = meta_topics[i];
session.subscribe(topic, onMetaEvent).then(
function (sub) { debug('Subscribed to meta topic ' + topic); },
function (err) { console.error('Failed to subscribe to meta topic ' + topic, err); }
);
}
// we can also listen to "wamp.registration.on_register" event and update
// the globals.singletonRID. But that requires name checking, which is not
// currently supported by autobahn
}
// Try starting the publisher as singleton instance.
// The way we are doing here is, a single publisher is
// sending data to all subscribers, so that it appears real traffic is
// synched across all browsers. The issue is, only one client
// should start the simulator (the publisher). To ensure that, we register
// the publisher as an RPC method and invoke it only upon successful registration.
// That way, dupicates will be avoided.
function startSingletonPublisher(session, publisherURI, publisherMethod) {
debug("Attempting to start singleton publisher: ", publisherURI);
session.register(publisherURI, publisherMethod).then(
function (registration) {
// registration successful, means this is the first instance of server.
debug("Starting singleton Publisher: ", session);
// invoke the method to start the singleton publisher
publisherMethod(session);
// reset the start attempt count
globals.serverStartAttempt = 0;
},
function (error) {
// Singleton publisher already running. Note down its ID
session.call("wamp.registration.lookup", [publisherURI]).then(
function (rid) {
debug("singleton server already running: ", rid);
globals.singletonRID = rid;
if (!rid)
attemptSingletonStart(session);
else
globals.serverStartAttempt = 0; // reset the start attempt count
}, function (err) {
// this should not happen, but happened. Retry.
attemptSingletonStart(session);
console.error("Unable to get singleton publisher ID", err);
});
}
);
}
function attemptSingletonStart(session) {
// random wait, so that all clients do not rush at the same time
var wait = (globals.serverStartAttempt++ + Math.random()) * 5000;
setTimeout(startSingletonPublisher, wait, session, "app.simulator", runSimulator);
debug("Attempting singleton restart in ", wait, "ms");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment