Created
June 13, 2012 10:59
-
-
Save Shinpeim/2923426 to your computer and use it in GitHub Desktop.
AMQPを利用したメッセージングサーバの習作
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "amqp" | |
AMQP.start(:host => "localhost") do |connection| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.direct("example") | |
room_id = ARGV.shift || "1" | |
message = ARGV.empty? ? "Hello World!" : ARGV.join(" ") | |
exchange.publish(message, :routing_key => room_id) | |
puts " [x] Sent #{room_id}:#{message}" | |
EM.add_timer(0.5) do | |
connection.close do | |
EM.stop { exit } | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
require 'eventmachine' | |
require 'json' | |
require 'hashie' | |
require "amqp" | |
USERS_TO_ROOMS_TABLE = { | |
'kksg' => [1,2,3,4], | |
'9m' => [1,3], | |
'mitukiii' => [2,3], | |
'ne_ko_' => [4], | |
} | |
class ConnectionHandler < EM::Connection | |
def post_init | |
@state = ConnectionState::Connected.new(self) | |
@buffer = '' | |
end | |
#クライアントからデータ送られて来たときに呼ばれる | |
#Stateに委譲 | |
def receive_data(chunk) | |
@buffer += chunk | |
return unless @buffer =~ /\r\n$/ | |
line = @buffer.gsub(/[[:cntrl:]]/,"").encode("UTF-8") rescue raise("invalid utf-8 string") | |
@buffer = '' | |
data = Hashie::Mash.new(JSON.parse(line)) rescue raise("invalid JSON") | |
next_state = @state.process(data) | |
@state = next_state | |
rescue => e | |
send_error(e.message) | |
end | |
#接続切れたときに呼ばれる | |
#Stateに委譲 | |
def unbind | |
@state.unbind | |
end | |
def send_line(data) | |
send_data(data + "\r\n") | |
end | |
private | |
def to_valid_utf8(line) | |
return | |
end | |
def send_error(msg) | |
error = JSON.generate({"event" => "error","message" => msg}) | |
send_line(error) | |
close_connection_after_writing | |
end | |
end | |
#State | |
module ConnectionState | |
#接続後すぐの状態 | |
class Connected | |
def initialize(connection) | |
@connection = connection | |
end | |
def process(data) | |
raise "user_id is required in login request" unless data.user_id | |
raise "invalid user" unless USERS_TO_ROOMS_TABLE[data.user_id] | |
return Logined.new(data.user_id,@connection) | |
end | |
def unbind | |
# do nothing | |
end | |
end | |
#ログイン後の状態 | |
class Logined | |
def initialize(user_id,connection) | |
subscribe(connection,USERS_TO_ROOMS_TABLE[user_id]) | |
connection.send_line("you enterd #{USERS_TO_ROOMS_TABLE[user_id].join(',')}") | |
end | |
def process(data) | |
#do nothing | |
return self | |
end | |
def unbind | |
end | |
private | |
def subscribe(connection,rooms) | |
amqp_connection = AMQP.connect(:host => '127.0.0.1') | |
puts "Connecting to AMQP broker. Running #{AMQP::VERSION} version of the gem..." | |
channel = AMQP::Channel.new(amqp_connection) | |
exchange = channel.direct("example") | |
queue = channel.queue("", :exclusive => true) | |
rooms.each do |room| | |
queue.bind(exchange, :routing_key => room.to_s) | |
end | |
queue.subscribe do |payload| | |
connection.send_line(payload) | |
end | |
end | |
end | |
end | |
EM::run do | |
host,port = "0.0.0.0", 1234 | |
EM::start_server host, port, ConnectionHandler | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment