Last active
January 26, 2016 08:38
-
-
Save davideicardi/240fb44f5803a960864b to your computer and use it in GitHub Desktop.
RabbitMq and Azure Service Bus Node.js class wrappers (RabbitMqChannel, AzureSubscription)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const debug = require("debug")("azureServiceBus"); | |
const events = require("events"); | |
const azure = require("azure"); | |
/* | |
Class wrapper around Azure Service Bus Topic/Subscription API. | |
Usage: | |
let bus = require("./azureServiceBus.js"); | |
let subscription = new bus.AzureSubscription(SERVICEBUS_CONNECTION, TOPICNAME, SUBSCRIPTIONNAME); | |
subscription.onMessage(YOURMSGLABEL, (msg) => { | |
// Your code to handle the message | |
}); | |
subscription.createIfNotExists({}, (error) =>{ | |
if (error) return console.log(error); | |
subscription.startReceiving(); | |
}); | |
*/ | |
class AzureSubscription { | |
constructor(azureBusUrl, topic, subscription) { | |
const retryOperations = new azure.ExponentialRetryPolicyFilter(); | |
this.waitOnceListeners = new Set(); | |
this.receiving = false; | |
this.topic = topic; | |
this.subscription = subscription; | |
this.eventEmitter = new events.EventEmitter(); | |
this.serviceBusService = azure | |
.createServiceBusService(azureBusUrl) | |
.withFilter(retryOperations); | |
} | |
receiveMessage(callback){ | |
this.serviceBusService | |
.receiveSubscriptionMessage(this.topic, this.subscription, (error, receivedMessage) => { | |
if(error) return callback(null, null); | |
debug(receivedMessage); | |
// Parse body | |
// try to read the body (and check if is serialized with .NET, int this case remove extra characters) | |
let matches = receivedMessage.body.match(/({.*})/); | |
if (matches || matches.length >= 1) receivedMessage.body = JSON.parse(matches[0]); | |
return callback(null, receivedMessage); | |
}); | |
} | |
createIfNotExists(options, callback){ | |
options = options | {}; | |
debug(`Checking subscription ${this.topic}/${this.subscription} ...`); | |
this.exists((error, exists) => { | |
if (error) return callback(error); | |
if (exists){ | |
debug(`Subscription ${this.topic}/${this.subscription} already exists.`); | |
return callback(null); | |
} | |
this.serviceBusService | |
.createSubscription(this.topic, this.subscription, options, (error) => { | |
if (error) return callback(error); | |
debug(`Subscription ${this.topic}/${this.subscription} created.`); | |
return callback(null); | |
}); | |
}); | |
} | |
exists(callback){ | |
this.serviceBusService | |
.getSubscription(this.topic, this.subscription, (error, result) => { | |
if (error) { | |
if (error.statusCode === 404) return callback(null, false); | |
return callback(error, false); | |
} | |
callback(null, true); | |
}); | |
} | |
onMessage(msgLabel, listener){ | |
this.eventEmitter.on(msgLabel, listener); | |
} | |
_emit(name, body){ | |
this.eventEmitter.emit(name, body); | |
let listeners = this.waitOnceListeners; | |
for(let item of listeners){ | |
if (item.predicate(name, body)){ | |
item.resolve(body); | |
listeners.delete(item); | |
} | |
} | |
} | |
_receivingLoop(receiveInterval){ | |
this.receiveMessage((error, msg) => { | |
if (error) debug(error); | |
if (!this.receiving) return; | |
if (!error && msg) { | |
this._emit(msg.brokerProperties.Label, msg.body); | |
} | |
setTimeout(() => this._receivingLoop(receiveInterval), receiveInterval); | |
}); | |
} | |
startReceiving(receiveInterval){ | |
receiveInterval = receiveInterval || 500; | |
if (this.receiving) return; | |
this.receiving = true; | |
debug("Start receiving messages..."); | |
this._receivingLoop(receiveInterval); | |
} | |
stopReceiving(){ | |
this.receiving = false; | |
debug("Stop receiving messages."); | |
} | |
waitOnce(predicate){ | |
return new Promise((resolve, reject) => { | |
this.waitOnceListeners.add({predicate: predicate, resolve: resolve, reject: reject}); | |
}); | |
} | |
} | |
module.exports.AzureSubscription = AzureSubscription; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const debug = require("debug")("rabbitMqServiceBus"); | |
const amqp = require('amqplib'); | |
// Code based on: | |
// https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html | |
// https://github.com/squaremo/amqp.node/blob/master/examples/tutorials/receive_logs.js | |
class RabbitMqChannel { | |
constructor(url) { | |
this.URL = url; | |
} | |
connect(){ | |
return amqp.connect(this.URL) | |
.then((conn) => { | |
this.connection = conn; | |
return conn.createChannel(); | |
}) | |
.then((ch) => { | |
this.channel = ch; | |
}); | |
} | |
close(){ | |
this.connection.close(); | |
} | |
subscribeToExchange(exchange, listener){ | |
return this.channel | |
//.assertExchange(exchange, "fanout", {durable: false}) | |
//.then(() => { | |
// return this.channel.assertQueue('', {exclusive: true}); | |
//}) | |
.assertQueue('', {exclusive: true}) | |
.then((qok) => { | |
return this.channel.bindQueue(qok.queue, exchange, '') | |
.then(() => { | |
return qok.queue; | |
}); | |
}) | |
.then((queue) => { | |
return this.channel.consume(queue, listener, {noAck: true}); | |
}) | |
.then(() => { | |
debug(` [*] Waiting for ${exchange}...`); | |
}); | |
} | |
} | |
module.exports.RabbitMqChannel = RabbitMqChannel; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment