Created
May 24, 2010 22:04
-
-
Save rjungemann/412486 to your computer and use it in GitHub Desktop.
A Redis pubsub example for Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Pieter Noordhuis | |
# Description: Simple demo to showcase Redis PubSub with EventMachine | |
# http://gist.github.com/348262 | |
# Incomplete evented Redis implementation specifically made for | |
# the new PubSub features in Redis. | |
# Modifications by Roger Jungemann | |
require 'stringio' | |
require 'rubygems' | |
require 'eventmachine' | |
class Redis | |
class PubSub < EM::Connection | |
def self.connect host = nil, port = nil | |
@host = (host || ENV['REDIS_HOST'] || 'localhost') | |
@port = (port || ENV['REDIS_PORT'] || 6379).to_i | |
EM.connect @host, @port, self | |
end | |
def post_init; @blocks = {} end | |
def publish(channel, msg); call_command('publish', channel, msg) end | |
def subscribe(*channels, &blk) | |
channels.each { |c| @blocks[c.to_s] = blk } | |
call_command('subscribe', *channels) | |
end | |
def unsubscribe *channels | |
channels.each { |c| @blocks[c.to_s] = nil } | |
call_command('unsubscribe', *channels) | |
end | |
def receive_data(data) | |
buffer = StringIO.new(data) | |
begin | |
parts = read_response(buffer) | |
if parts.is_a?(Array) | |
ret = @blocks[parts[1]].call(parts) | |
close_connection if ret === false | |
end | |
end while !buffer.eof? | |
end | |
private | |
def read_response(buffer) | |
type = buffer.read(1) | |
if type == ':' | |
buffer.gets.to_i | |
elsif type == '*' | |
size = buffer.gets.to_i | |
parts = size.times.map { read_object(buffer) } | |
else | |
raise "unsupported response type" | |
end | |
end | |
def read_object(data) | |
type = data.read(1) | |
if type == ':' # integer | |
data.gets.to_i | |
elsif type == '$' | |
size = data.gets | |
str = data.read(size.to_i) | |
data.read(2) # crlf | |
str | |
else | |
raise "read for object of type #{type} not implemented" | |
end | |
end | |
# only support multi-bulk | |
def call_command(*args) | |
command = "*#{args.size}\r\n" | |
args.each { |a| | |
command << "$#{a.to_s.size}\r\n" | |
command << a.to_s | |
command << "\r\n" | |
} | |
send_data command | |
end | |
end | |
end | |
#EM.run do | |
# sub = Redis::PubSub.connect | |
# pub = Redis::PubSub.connect | |
# | |
# sub.subscribe("chat") do |t, c, m| | |
# puts m #=> "hello" | |
# end | |
# | |
# pub.publish("chat", "hello") | |
#end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment