Created
December 14, 2016 14:13
-
-
Save qknow-w/01264441dbacac219a9427a41bca7a2f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* 初始化rabbitmq | |
* author: colin.xu | |
* 2016.11.8 | |
*/ | |
const amqp = require('amqplib'); | |
const config = require('config'); | |
const log4js = require('log4js'); | |
const logger = log4js.getLogger('amqp'); | |
logger.setLevel('INFO'); | |
class AmqpConnection { | |
constructor(connStr) { | |
this.connStr = connStr; //连接字符串 | |
} | |
connect() { | |
let self = this; | |
return new Promise((resolve, reject) => { | |
if (self.conn) return resolve(self.conn); | |
amqp.connect(self.connStr).then((conn) => { | |
logger.info('连接rabbitmq成功'); | |
//If your program runs until interrupted, you can hook into the process signal handling to close the connection: | |
process.once('SIGINT', conn.close.bind(conn)); | |
self.conn = conn; | |
//监听连接关闭事件 | |
conn.on('close', () => { | |
logger.info('rabbimq连接关闭'); | |
}) | |
//监听连接错误事件 | |
conn.on('error', (err) => { | |
logger.error(`rabbimq连接出错:${err.stack}`); | |
}) | |
//监听连接阻塞事件 | |
conn.on('blocked', (reason) => { | |
logger.error(`连接阻塞,原因:${reason}`); | |
}) | |
//监听阻塞连接恢复正常事件 | |
conn.on('unblocked', () => { | |
logger.info('阻塞连接恢复正常'); | |
}) | |
resolve(conn); | |
}).catch((err) => { | |
logger.error(`连接rabbitmq失败,${err.stack}`); | |
reject(err); | |
}) | |
}) | |
} | |
createChannel() { | |
let self = this; | |
return new Promise((resolve, reject) => { | |
//if (self.ch) return resolve(self.ch); | |
self.connect() | |
.then(() => { | |
return self.conn.createChannel(); | |
}) | |
.then((ch) => { | |
self.ch = ch; | |
ch.on('close', () => { | |
//logger.info('channel关闭'); | |
}) | |
ch.on('error', (err) => { | |
logger.error(`channel出错:${err.stack}`); | |
}) | |
ch.on('return', (msg) => { | |
logger.info(`channel return:${msg}`); | |
}) | |
ch.on('drain', () => { | |
logger.error('drain event fired.'); | |
}) | |
resolve(ch); | |
}) | |
.catch((err) => { | |
logger.error(`创建channel失败,${err.stack}`); | |
reject(err); | |
}) | |
}) | |
} | |
/** | |
* @param exchange | |
* @param routingKey | |
* @param msg 发送消息 | |
* @param type exchange类别: | |
* fanout:把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 | |
* direct:把消息路由到那些binding key与routing key完全匹配的Queue中。 | |
* topic: topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定: | |
* routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” | |
* binding key与routing key一样也是句点号“. ”分隔的字符串 | |
* binding key中可以存在两种特殊字符“*”和"#",用于做模糊匹配,其中前面一个用于匹配一个单词,“#”用于匹配多个单词(可以是零个) | |
* headers:headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 | |
* @param option | |
*/ | |
publish(exchange, routingKey, msg, type, option) { | |
if (!option) option = { durable: false }; | |
let ch; | |
this.createChannel() | |
.then((channel) => { | |
ch = channel; | |
return ch.assertExchange(exchange, type, option); | |
}) | |
.then(() => { | |
ch.publish(exchange, routingKey, new Buffer(msg)); | |
return ch.close(); | |
}) | |
.catch((err) => { | |
logger.error(`rabbitmq插入消息失败,${err.stack}`); | |
}) | |
} | |
consume(exchange, queue, bindingkey, type, option) { | |
if (!option) option = { durable: false }; | |
let ch; | |
this.createChannel() | |
.then((channel) => { | |
ch = channel; | |
return ch.assertExchange(exchange, type, option); | |
}) | |
.then(() => { | |
//定义queue | |
return ch.assertQueue(queue); | |
}) | |
.then((ok) => { | |
let q = ok.queue; | |
ch.bindQueue(q, exchange, bindingkey); | |
ch.consume(q, (msg) => { | |
doWork(msg.content.toString()); | |
}, { noAck: true }) | |
}) | |
} | |
} | |
function doWork(msg) { | |
//todo something... | |
console.log(msg) | |
} | |
let amqpConnection = new AmqpConnection(config.get('rabbitmq')); | |
amqpConnection.connect(); | |
module.exports = amqpConnection; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment