Created
January 17, 2010 02:22
-
-
Save francois/279168 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require "rubygems" | |
| require "mq" | |
| # Here, we block until we receive a message. Since tmm1's amqp gem is callback based, | |
| # the blocks are invoked when an action is required of user code. | |
| EM.run do | |
| # Each queue represents an end-point: a place where messages are distributed. | |
| # If multiple processes/clients use the *same queue name*, the server will | |
| # round-robin messages to each process/client. | |
| queue = MQ.queue("integers channel") | |
| queue.bind("integers").subscribe do |n| | |
| p n | |
| end | |
| # Which is why I put the process' PID in the queue name, or else the fanout would | |
| # not be received by all processes simultaneously. | |
| MQ.queue("control channel #{Process.pid}").bind("control").subscribe do |command| | |
| puts "Received control instruction: #{command.inspect}" | |
| case command | |
| when /exit/i | |
| puts "Exiting..." | |
| EM.stop_event_loop # This will cause an exit from the top-level #run block | |
| else | |
| puts "Invalid command: ignoring" | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require "rubygems" | |
| require "mq" | |
| n = (ARGV.shift || 0).to_i | |
| delta = (ARGV.shift || 1).to_i | |
| # Simply publish a String representation of integers, once per second. | |
| # Also provide a way to kill/stop all listening processes simultaneously. | |
| EM.run do | |
| # Create an exchange where data will be pushed. Name it "integers". | |
| integers = MQ.direct("integers") | |
| EM.add_periodic_timer(1) do | |
| n += delta | |
| p n | |
| integers.publish(n.to_s) | |
| end | |
| # A 'fanout' is an exchange that has multiple subscribers: | |
| # push 1 message to N subscribers. In this case, I'm pushing/publishing | |
| # control messages on the fanout, so all listeners will receive it. | |
| control = MQ.fanout("control") | |
| trap("USR1") do | |
| control.publish("exit") | |
| end | |
| trap("USR2") do | |
| control.publish("stats") | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment