Skip to content

Instantly share code, notes, and snippets.

@kusor
Created May 16, 2010 08:20
Show Gist options
  • Save kusor/402761 to your computer and use it in GitHub Desktop.
Save kusor/402761 to your computer and use it in GitHub Desktop.
NodeJS AMQP Exchanges
[submodule "amqp"]
path = amqp
url = http://github.com/ry/node-amqp.git
#!/usr/bin/env node
// Simply publish a message to a direct exchange.
sys = require('sys');
amqp = require('./amqp');
inspect = sys.inspect;
puts = sys.puts;
connection = amqp.createConnection();
connection.addListener('error', function (e) {
throw e;
})
connection.addListener('close', function (e) {
puts('connection closed.');
});
connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
var exchange = connection.exchange('node-direct-exchange', {type: 'direct'});
// Consumer:
var queue = connection.queue('node-direct-queue');
queue.bind(exchange, "direct-key");
queue.subscribe(function (message) {
// raw message is a Buffer:
puts("Got a message: \n" + inspect(message));
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
})
.addCallback(function () {
// Publisher:
// [without a queue, can use with: exchange.addListener('open', function () {/*...*/});]
puts("publishing message");
exchange.publish("direct-key", "¡España!", {contentType: 'text/plain'});
setTimeout(function () {
// wait one second to receive the message, then quit
connection.end();
}, 1000);
});
});
#!/usr/bin/env node
// Fanout exchange with two consumers.
sys = require('sys');
amqp = require('./amqp');
inspect = sys.inspect;
puts = sys.puts;
connection = amqp.createConnection();
connection.addListener('error', function (e) {
throw e;
})
connection.addListener('close', function (e) {
puts('connection closed.');
});
connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
var exchange = connection.exchange('node-fanout-exchange', {type: 'fanout'});
// Consumer 1:
var _1st_queue = connection.queue('node-fanout-queue-one');
_1st_queue.bind(exchange, "key-one");
_1st_queue.subscribe(function (message) {
puts("Got a message (Consumer 1): ");
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
});
// Consumer 2:
var _2nd_queue = connection.queue('node-fanout-queue-two');
_2nd_queue.bind(exchange, "key-two");
_2nd_queue.subscribe(function (message) {
puts("Got a message (Consumer 2):");
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
})
.addCallback(function () {
// Publisher:
puts("publishing message");
exchange.publish("whatever-key", "¡España!", {contentType: 'text/plain'});
setTimeout(function () {
// wait one second to receive the message, then quit
connection.end();
}, 1000);
});
});
#!/usr/bin/env node
// Topic exchange (default) with three consumers:
sys = require('sys');
amqp = require('./amqp');
inspect = sys.inspect;
puts = sys.puts;
connection = amqp.createConnection();
connection.addListener('error', function (e) {
throw e;
})
connection.addListener('close', function (e) {
puts('connection closed.');
});
connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
// There is no need to declare type, 'topic' is the default:
var exchange = connection.exchange('node-topic-exchange');
// Consumer:
var _1st_queue = connection.queue('node-topic-queue-one');
_1st_queue.bind(exchange, "topic_a.*");
_1st_queue.subscribe(function (message) {
// raw message is a Buffer:
puts("Got a message (Consumer 1 - \"topic_a.*\"):");
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
});
var _2nd_queue = connection.queue('node-topic-queue-two');
_2nd_queue.bind(exchange, "*.subtopic_b");
_2nd_queue.subscribe(function (message) {
// raw message is a Buffer:
puts("Got a message (Consumer 2 - \"*.subtopic_b\"):");
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
});
var _3rd_queue = connection.queue('node-topic-queue-three');
_3rd_queue.bind(exchange, "#");
_3rd_queue.subscribe(function (message) {
// raw message is a Buffer:
puts("Got a message (Consumer 3 - \"#\"):");
// Message Properties:
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---");
puts("--- contentType: " + message.contentType);
// Get original message string:
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length));
})
.addCallback(function () {
// Publisher:
puts("publishing messages");
exchange.publish("topic_a.subtopic_a", "Message One", {contentType: 'text/plain'});
exchange.publish("topic_a.subtopic_b", "Message Two", {contentType: 'text/plain'});
exchange.publish("topic_b.subtopic_b", "Message Three", {contentType: 'text/plain'});
setTimeout(function () {
// wait one second to receive the message, then quit
connection.end();
}, 1000);
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment