Created
August 29, 2012 09:43
-
-
Save keitheis/3509276 to your computer and use it in GitHub Desktop.
RabbitMQ Topic for Pub/Sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
% ruby sub_update.rb | |
# output after ruby publisher.rb | |
Update form: UPDATE 89, routing key is data.update | |
% ruby sub_logger.rb | |
# output after ruby publisher.rb | |
Log: UPDATE 89, routing key is data.update | |
Log: CREATE 90, routing key is data.create | |
% ruby publisher.rb |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# encoding: utf-8 | |
require "rubygems" | |
require "amqp" | |
EXCHANGE = "data" | |
AMQP.start("amqp://127.0.0.1") do |connection| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.topic(EXCHANGE) | |
# one create and one update | |
exchange.publish("UPDATE 89", :routing_key => "data.update") | |
exchange.publish("CREATE 90", :routing_key => "data.create") | |
show_stopper = Proc.new { | |
connection.close { EventMachine.stop } | |
} | |
EM.add_timer(2, show_stopper) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# encoding: utf-8 | |
require "rubygems" | |
require "amqp" | |
EXCHANGE = "data" | |
QUEUE = "data.log" | |
EventMachine.run do | |
AMQP.connect do |connection| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.topic(EXCHANGE) | |
# Subscribe to all data events to log them | |
binding = channel.queue(QUEUE, :exclusive => true).bind(exchange, :routing_key => "data.#") | |
binding.subscribe do |headers, payload| | |
puts "Log: #{payload}, routing key is #{headers.routing_key}" | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# encoding: utf-8 | |
require "rubygems" | |
require "amqp" | |
EXCHANGE = "data" | |
QUEUE = "data.update" | |
EventMachine.run do | |
AMQP.connect do |connection| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.topic(EXCHANGE) | |
# Only subscribe to data.update | |
binding = channel.queue(QUEUE).bind(exchange, :routing_key => "data.update") | |
binding.subscribe do |headers, payload| | |
puts "Update form: #{payload}, routing key is #{headers.routing_key}" | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment