Created
December 13, 2013 17:34
-
-
Save dmitriy-sqrt/7948013 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'amqp' | |
| require 'em-websocket' | |
| class Websockets | |
| def self.publisher(message = "Hello world!"+Time.now.to_s, routing_key = "amqp.test") #routing_key will contain current_user.id | |
| #connect to rabbitmq service, open channel, create queue | |
| AMQP.start("amqp://localhost") do |connection| # connection is open and ready to be used | |
| channel = AMQP::Channel.new(connection) | |
| exchange = channel.direct("") | |
| #publish a message | |
| puts "Send: #{message}" | |
| exchange.publish message, :routing_key => routing_key #each client should have his queue name | |
| # stop in n seconds #2do - play with timeout | |
| EventMachine.add_timer(1.0) { connection.close { EventMachine.stop } } | |
| end | |
| end | |
| def self.consumer() | |
| EventMachine.run do | |
| EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8030) do |ws| | |
| user_id = nil | |
| #AMPQ shit below | |
| connection = AMQP.connect(:host => '127.0.0.1') | |
| channel = AMQP::Channel.new(connection) #move to upper block? (from ws.onopen) | |
| exchange = channel.direct("") #same shit? | |
| puts "AMQP: connect" | |
| #WebSockets binds below | |
| ws.onopen { |handshake| #2do get and parse cookie from handshake object | |
| cookie_string = handshake.headers['Cookie'].to_s | |
| user_id = CookieMagic::user_from_cookie(cookie_string) | |
| puts "WS: connection opened (user #{user_id})" | |
| if user_id | |
| #AMQP.connect(:host => '127.0.0.1') do |connection| | |
| puts "AMQP: subscribe (user #{user_id})" | |
| #subscribe to rabbitmq queue | |
| channel.queue("amqp.test.u#{user_id}", :auto_delete => true).subscribe do |payload| | |
| puts "AMQP: Received a message: '#{payload}', resending it to WS (user #{user_id})" | |
| ws.send "#{payload}" | |
| end | |
| #end | |
| else | |
| ws.close | |
| end #if user_id | |
| } #ws.onopen | |
| ws.onclose { | |
| puts "WS: Connection closed (user #{user_id})" | |
| } | |
| #AMQP should also be closed here. But how? | |
| #to call queue(...).unsubscribe is better i suppose | |
| #mb this should help - http://siddharth-ravichandran.com/2011/07/21/rabbitmq-and-rails-why-my-chat-application-failed/ | |
| ws.onmessage { |msg| } | |
| end #EventMachine::WebSocket.start | |
| end #EventMachine.run | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment