Skip to content

Instantly share code, notes, and snippets.

@yunghoy
Last active June 2, 2022 00:34
Show Gist options
  • Save yunghoy/a425f91824d26461bb2e3653bc56ebbf to your computer and use it in GitHub Desktop.
Save yunghoy/a425f91824d26461bb2e3653bc56ebbf to your computer and use it in GitHub Desktop.
AMQP library (RabbitMQ) - async/await
alias babel-node='babel-node --presets stage-0'
------ RECV ------
// babel-node recv2.js "#"
// babel-node recv2.js "kern.*"
const amqp = require('amqplib');
const args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
const EXCHANGE_NAME = 'exchange_name';
const EXCHANGE_TYPE = 'x-recent-history';
const EXCHANGE_OPTION = {
durable: true,
autoDelete: true,
arguments: {
'x-recent-history-length': 10
}
};
async function main() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE, EXCHANGE_OPTION);
// await channel.bindExchange('exchange_name_another_one', EXCHANGE_NAME, '', {});
const q = await channel.assertQueue('', {exclusive: true});
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach((key) => channel.bindQueue(q.queue, EXCHANGE_NAME, key));
channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
}
main();
------ END OF RECV ------
------ SEND ------
// babel-node send2.js "kern.critical" "A critical kernel error"
const amqp = require('amqplib');
const args = process.argv.slice(2);
const key = (args.length > 0) ? args[0] : 'anonymous.info';
const msg = args.slice(1).join(' ') || 'Hello World!';
const EXCHANGE_NAME = 'exchange_name';
const EXCHANGE_TYPE = 'x-recent-history';
const EXCHANGE_OPTION = {
durable: true,
autoDelete: true,
arguments: {
'x-recent-history-length': 10
}
};
async function main() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
await channel.assertExchange(EXCHANGE_NAME, EXCHANGE_TYPE, EXCHANGE_OPTION);
channel.publish(EXCHANGE_NAME, key, new Buffer(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
setTimeout(() => {
conn.close();
process.exit(0);
}, 500);
};
main();
------ END OF SEND ------
@ruscon
Copy link

ruscon commented Sep 6, 2021

the same question ^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment