Skip to content

Instantly share code, notes, and snippets.

@Shinpeim
Created June 13, 2012 10:59
Show Gist options
  • Save Shinpeim/2923426 to your computer and use it in GitHub Desktop.
Save Shinpeim/2923426 to your computer and use it in GitHub Desktop.
AMQPを利用したメッセージングサーバの習作
require "amqp"
AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.direct("example")
room_id = ARGV.shift || "1"
message = ARGV.empty? ? "Hello World!" : ARGV.join(" ")
exchange.publish(message, :routing_key => room_id)
puts " [x] Sent #{room_id}:#{message}"
EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
# -*- coding: utf-8 -*-
require 'eventmachine'
require 'json'
require 'hashie'
require "amqp"
USERS_TO_ROOMS_TABLE = {
'kksg' => [1,2,3,4],
'9m' => [1,3],
'mitukiii' => [2,3],
'ne_ko_' => [4],
}
class ConnectionHandler < EM::Connection
def post_init
@state = ConnectionState::Connected.new(self)
@buffer = ''
end
#クライアントからデータ送られて来たときに呼ばれる
#Stateに委譲
def receive_data(chunk)
@buffer += chunk
return unless @buffer =~ /\r\n$/
line = @buffer.gsub(/[[:cntrl:]]/,"").encode("UTF-8") rescue raise("invalid utf-8 string")
@buffer = ''
data = Hashie::Mash.new(JSON.parse(line)) rescue raise("invalid JSON")
next_state = @state.process(data)
@state = next_state
rescue => e
send_error(e.message)
end
#接続切れたときに呼ばれる
#Stateに委譲
def unbind
@state.unbind
end
def send_line(data)
send_data(data + "\r\n")
end
private
def to_valid_utf8(line)
return
end
def send_error(msg)
error = JSON.generate({"event" => "error","message" => msg})
send_line(error)
close_connection_after_writing
end
end
#State
module ConnectionState
#接続後すぐの状態
class Connected
def initialize(connection)
@connection = connection
end
def process(data)
raise "user_id is required in login request" unless data.user_id
raise "invalid user" unless USERS_TO_ROOMS_TABLE[data.user_id]
return Logined.new(data.user_id,@connection)
end
def unbind
# do nothing
end
end
#ログイン後の状態
class Logined
def initialize(user_id,connection)
subscribe(connection,USERS_TO_ROOMS_TABLE[user_id])
connection.send_line("you enterd #{USERS_TO_ROOMS_TABLE[user_id].join(',')}")
end
def process(data)
#do nothing
return self
end
def unbind
end
private
def subscribe(connection,rooms)
amqp_connection = AMQP.connect(:host => '127.0.0.1')
puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..."
channel = AMQP::Channel.new(amqp_connection)
exchange = channel.direct("example")
queue = channel.queue("", :exclusive => true)
rooms.each do |room|
queue.bind(exchange, :routing_key => room.to_s)
end
queue.subscribe do |payload|
connection.send_line(payload)
end
end
end
end
EM::run do
host,port = "0.0.0.0", 1234
EM::start_server host, port, ConnectionHandler
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment