Skip to content

Instantly share code, notes, and snippets.

@monteslu
Last active January 16, 2018 15:52
Show Gist options
  • Save monteslu/8091d5c8ee6c9731b5ec6805f5f04644 to your computer and use it in GitHub Desktop.
Save monteslu/8091d5c8ee6c9731b5ec6805f5f04644 to your computer and use it in GitHub Desktop.
rawrer
const mqtt = require('mqtt');
const createRpc = require('./rpc');
const subTopic = 'clienta';
const pubTopic = 'clientb';
const conn = mqtt.connect();
conn.on('connect', function (a) {
console.log('connect', a);
global.rpc = createRpc({pubTopic, subTopic, conn});
});
conn.on('error', function(err){
console.log('error in mqtt connection', err);
});
const mqtt = require('mqtt');
const createRpc = require('./rpc');
const subTopic = 'clientb';
const pubTopic = 'clienta';
const conn = mqtt.connect();
conn.on('connect', function (a) {
console.log('connect', a);
global.rpc = createRpc({pubTopic, subTopic, conn, handlers: {
add: function(a, b) {
return a + b;
}
}});
});
conn.on('error', function(err){
console.log('error in mqtt connection', err);
});
module.exports = function({pubTopic, subTopic, conn, handlers, timeout = 3000}){
let pubId = 0;
conn.subscribe(subTopic);
const methodHandlers = {};
const pendingCalls = {};
function addMethod(methodName, handler) {
methodHandlers[methodName] = function(msg) {
Promise.resolve(handler.apply(this, msg.params || []))
.then(function(result) {
conn.publish(msg.reply, JSON.stringify({jsonrpc : '2.0', id: msg.id, result}));
})
.catch(function(error) {
conn.publish(msg.reply, JSON.stringify({jsonrpc : '2.0', id: msg.id, error}));
});
}
}
for ( m in handlers){
addMethod(m, handlers[m]);
}
conn.on('message', function(topic, payload) {
try {
const msg = JSON.parse(payload.toString());
if(msg.id) {
if(msg.params) {
if(methodHandlers[msg.method]){
methodHandlers[msg.method](msg);
}
}
else {
const pc = pendingCalls[msg.id];
if(pc) {
clearTimeout(pc.timeoutId);
delete pendingCalls[msg.id];
if (msg.error) {
pc.reject(msg.error);
}
else {
pc.resolve(msg.result);
}
}
}
}
}catch(exp){
console.log('err', exp);
}
});
const funcHandler = {
get: function(target, name) {
if(name in target) {
return target[name];
}
return function (...args) {
pubId++;
const thisMsgId = pubId;
const rpc = {
jsonrpc : '2.0',
method: name,
params: args,
'id': thisMsgId,
reply: subTopic
};
const timeoutId = setTimeout(function() {
if(pendingCalls[thisMsgId]) {
pendingCalls[thisMsgId].reject('Timeout');
delete pendingCalls[thisMsgId];
}
}, timeout);
const response = new Promise(function(resolve, reject) {
pendingCalls[thisMsgId] = { resolve, reject, timeoutId };
});
conn.publish(pubTopic, JSON.stringify(rpc));
return response;
}
}
};
return new Proxy({}, funcHandler);
}
const mosca = require('mosca');
const server = new mosca.Server();
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
// fired when a message is received
server.on('published', function(packet, client) {
console.log('Published', packet.topic, packet.payload.toString());
});
server.on('ready', setup);
// fired when the mqtt server is ready
function setup() {
console.log('Mosca server is up and running');
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment