Created
May 16, 2010 08:20
-
-
Save kusor/402761 to your computer and use it in GitHub Desktop.
NodeJS AMQP Exchanges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[submodule "amqp"] | |
path = amqp | |
url = http://github.com/ry/node-amqp.git |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
// Simply publish a message to a direct exchange. | |
sys = require('sys'); | |
amqp = require('./amqp'); | |
inspect = sys.inspect; | |
puts = sys.puts; | |
connection = amqp.createConnection(); | |
connection.addListener('error', function (e) { | |
throw e; | |
}) | |
connection.addListener('close', function (e) { | |
puts('connection closed.'); | |
}); | |
connection.addListener('ready', function () { | |
puts("connected to " + connection.serverProperties.product); | |
var exchange = connection.exchange('node-direct-exchange', {type: 'direct'}); | |
// Consumer: | |
var queue = connection.queue('node-direct-queue'); | |
queue.bind(exchange, "direct-key"); | |
queue.subscribe(function (message) { | |
// raw message is a Buffer: | |
puts("Got a message: \n" + inspect(message)); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}) | |
.addCallback(function () { | |
// Publisher: | |
// [without a queue, can use with: exchange.addListener('open', function () {/*...*/});] | |
puts("publishing message"); | |
exchange.publish("direct-key", "¡España!", {contentType: 'text/plain'}); | |
setTimeout(function () { | |
// wait one second to receive the message, then quit | |
connection.end(); | |
}, 1000); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
// Fanout exchange with two consumers. | |
sys = require('sys'); | |
amqp = require('./amqp'); | |
inspect = sys.inspect; | |
puts = sys.puts; | |
connection = amqp.createConnection(); | |
connection.addListener('error', function (e) { | |
throw e; | |
}) | |
connection.addListener('close', function (e) { | |
puts('connection closed.'); | |
}); | |
connection.addListener('ready', function () { | |
puts("connected to " + connection.serverProperties.product); | |
var exchange = connection.exchange('node-fanout-exchange', {type: 'fanout'}); | |
// Consumer 1: | |
var _1st_queue = connection.queue('node-fanout-queue-one'); | |
_1st_queue.bind(exchange, "key-one"); | |
_1st_queue.subscribe(function (message) { | |
puts("Got a message (Consumer 1): "); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}); | |
// Consumer 2: | |
var _2nd_queue = connection.queue('node-fanout-queue-two'); | |
_2nd_queue.bind(exchange, "key-two"); | |
_2nd_queue.subscribe(function (message) { | |
puts("Got a message (Consumer 2):"); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}) | |
.addCallback(function () { | |
// Publisher: | |
puts("publishing message"); | |
exchange.publish("whatever-key", "¡España!", {contentType: 'text/plain'}); | |
setTimeout(function () { | |
// wait one second to receive the message, then quit | |
connection.end(); | |
}, 1000); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
// Topic exchange (default) with three consumers: | |
sys = require('sys'); | |
amqp = require('./amqp'); | |
inspect = sys.inspect; | |
puts = sys.puts; | |
connection = amqp.createConnection(); | |
connection.addListener('error', function (e) { | |
throw e; | |
}) | |
connection.addListener('close', function (e) { | |
puts('connection closed.'); | |
}); | |
connection.addListener('ready', function () { | |
puts("connected to " + connection.serverProperties.product); | |
// There is no need to declare type, 'topic' is the default: | |
var exchange = connection.exchange('node-topic-exchange'); | |
// Consumer: | |
var _1st_queue = connection.queue('node-topic-queue-one'); | |
_1st_queue.bind(exchange, "topic_a.*"); | |
_1st_queue.subscribe(function (message) { | |
// raw message is a Buffer: | |
puts("Got a message (Consumer 1 - \"topic_a.*\"):"); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}); | |
var _2nd_queue = connection.queue('node-topic-queue-two'); | |
_2nd_queue.bind(exchange, "*.subtopic_b"); | |
_2nd_queue.subscribe(function (message) { | |
// raw message is a Buffer: | |
puts("Got a message (Consumer 2 - \"*.subtopic_b\"):"); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}); | |
var _3rd_queue = connection.queue('node-topic-queue-three'); | |
_3rd_queue.bind(exchange, "#"); | |
_3rd_queue.subscribe(function (message) { | |
// raw message is a Buffer: | |
puts("Got a message (Consumer 3 - \"#\"):"); | |
// Message Properties: | |
puts("--- Message (" + message._deliveryTag + ", '" + message._routingKey + "') ---"); | |
puts("--- contentType: " + message.contentType); | |
// Get original message string: | |
puts('--- toString(): ' + message.data.toString('ascii', 0, message.data.length)); | |
}) | |
.addCallback(function () { | |
// Publisher: | |
puts("publishing messages"); | |
exchange.publish("topic_a.subtopic_a", "Message One", {contentType: 'text/plain'}); | |
exchange.publish("topic_a.subtopic_b", "Message Two", {contentType: 'text/plain'}); | |
exchange.publish("topic_b.subtopic_b", "Message Three", {contentType: 'text/plain'}); | |
setTimeout(function () { | |
// wait one second to receive the message, then quit | |
connection.end(); | |
}, 1000); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment