-
-
Save rafacv/349277 to your computer and use it in GitHub Desktop.
Simple demo to showcase Redis PubSub with EventMachine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Pieter Noordhuis | |
# Description: Simple demo to showcase Redis PubSub with EventMachine | |
# | |
# Requirements: | |
# - rubygems: eventmachine, thin, cramp, sinatra, yajl-ruby | |
# - a browser with WebSocket support | |
# | |
# Usage: | |
# ruby redis_pubsub_demo.rb | |
# | |
require 'rubygems' | |
require 'eventmachine' | |
require 'stringio' | |
require 'sinatra/base' | |
require 'cramp/controller' | |
require 'yajl' | |
# Incomplete evented Redis implementation specifically made for | |
# the new PubSub features in Redis. | |
class EventedRedis < EM::Connection | |
def self.connect | |
host = (ENV['REDIS_HOST'] || 'localhost') | |
port = (ENV['REDIS_PORT'] || 6379).to_i | |
EM.connect host, port, self | |
end | |
def post_init | |
@blocks = {} | |
end | |
def subscribe(*channels, &blk) | |
channels.each { |c| @blocks[c.to_s] = blk } | |
call_command('subscribe', *channels) | |
end | |
def publish(channel, msg) | |
call_command('publish', channel, msg) | |
end | |
def unsubscribe | |
call_command('unsubscribe') | |
end | |
def receive_data(data) | |
buffer = StringIO.new(data) | |
begin | |
parts = read_response(buffer) | |
if parts.is_a?(Array) | |
ret = @blocks[parts[1]].call(parts) | |
close_connection if ret === false | |
end | |
end while !buffer.eof? | |
end | |
private | |
def read_response(buffer) | |
type = buffer.read(1) | |
case type | |
when ':' | |
buffer.gets.to_i | |
when '*' | |
size = buffer.gets.to_i | |
parts = size.times.map { read_object(buffer) } | |
else | |
raise "unsupported response type" | |
end | |
end | |
def read_object(data) | |
type = data.read(1) | |
case type | |
when ':' # integer | |
data.gets.to_i | |
when '$' | |
size = data.gets | |
str = data.read(size.to_i) | |
data.read(2) # crlf | |
str | |
else | |
raise "read for object of type #{type} not implemented" | |
end | |
end | |
# only support multi-bulk | |
def call_command(*args) | |
command = "*#{args.size}\r\n" | |
args.each { |a| | |
command << "$#{a.to_s.size}\r\n" | |
command << a.to_s | |
command << "\r\n" | |
} | |
send_data command | |
end | |
end | |
class ChatController < Cramp::Controller::Websocket | |
on_start :create_redis | |
on_finish :handle_leave, :destroy_redis | |
on_data :received_data | |
def create_redis | |
@pub = EventedRedis.connect | |
@sub = EventedRedis.connect | |
end | |
def destroy_redis | |
@pub.close_connection_after_writing | |
@sub.close_connection_after_writing | |
end | |
def received_data(data) | |
msg = parse_json(data) | |
case msg[:action] | |
when 'join' | |
handle_join(msg) | |
when 'message' | |
handle_message(msg) | |
else | |
# skip | |
end | |
end | |
def handle_join(msg) | |
@user = msg[:user] | |
subscribe | |
publish :action => 'control', :user => @user, :message => 'joined the chat room' | |
end | |
def handle_leave | |
publish :action => 'control', :user => @user, :message => 'left the chat room' | |
end | |
def handle_message(msg) | |
publish msg.merge(:user => @user) | |
end | |
private | |
def subscribe | |
@sub.subscribe('chat') do |type,channel,message| | |
render message | |
end | |
end | |
def publish(message) | |
@pub.publish('chat', encode_json(message)) | |
end | |
def encode_json(obj) | |
Yajl::Encoder.encode(obj) | |
end | |
def parse_json(str) | |
Yajl::Parser.parse(str, :symbolize_keys => true) | |
end | |
end | |
class StaticController < Sinatra::Base | |
enable :inline_templates | |
get('/') { erb :main } | |
end | |
EventMachine.run { | |
Cramp::Controller::Websocket.backend = :thin | |
Rack::Handler::Thin.run ChatController, :Port => 8081 | |
Rack::Handler::Thin.run StaticController, :Port => 8082 | |
} | |
__END__ | |
@@ main | |
<html> | |
<head> | |
<script src='http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js'></script> | |
<script src='http://jquery-json.googlecode.com/files/jquery.json-2.2.min.js'></script> | |
<script src='http://datejs.googlecode.com/svn/trunk/build/date.js'></script> | |
<script> | |
$(document).ready(function(){ | |
if (typeof WebSocket != 'undefined') { | |
$('#ask').show(); | |
} else { | |
$('#error').show(); | |
} | |
// join on enter | |
$('#ask input').keydown(function(event) { | |
if (event.keyCode == 13) { | |
$('#ask a').click(); | |
} | |
}) | |
// join on click | |
$('#ask a').click(function() { | |
join($('#ask input').val()); | |
$('#ask').hide(); | |
$('#channel').show(); | |
$('input#message').focus(); | |
}); | |
function join(name) { | |
var host = window.location.host.split(':')[0]; | |
var ws = new WebSocket("ws://" + host + ":8081/websocket"); | |
var container = $('div#msgs'); | |
ws.onmessage = function(evt) { | |
var obj = $.evalJSON(evt.data); | |
if (typeof obj != 'object') return; | |
var action = obj['action']; | |
var struct = container.find('li.' + action + ':first'); | |
if (struct.length < 1) { | |
console.log("Could not handle: " + evt.data); | |
return; | |
} | |
var msg = struct.clone(); | |
msg.find('.time').text((new Date()).toString("HH:mm:ss")); | |
if (action == 'message') { | |
var matches; | |
if (matches = obj['message'].match(/^\s*[\/\\]me\s(.*)/)) { | |
msg.find('.user').text(obj['user'] + ' ' + matches[1]); | |
msg.find('.user').css('font-weight', 'bold'); | |
} else { | |
msg.find('.user').text(obj['user']); | |
msg.find('.message').text(': ' + obj['message']); | |
} | |
} else if (action == 'control') { | |
msg.find('.user').text(obj['user']); | |
msg.find('.message').text(obj['message']); | |
msg.addClass('control'); | |
} | |
if (obj['user'] == name) msg.find('.user').addClass('self'); | |
container.find('ul').append(msg.show()); | |
container.scrollTop(container.find('ul').innerHeight()); | |
} | |
$('#channel form').submit(function(event) { | |
event.preventDefault(); | |
var input = $(this).find(':input'); | |
var msg = input.val(); | |
ws.send($.toJSON({ action: 'message', message: msg })); | |
input.val(''); | |
}); | |
// send name when joining | |
ws.onopen = function() { | |
ws.send($.toJSON({ action: 'join', user: name })); | |
} | |
} | |
}); | |
</script> | |
<style type="text/css" media="screen"> | |
* { | |
font-family: Georgia; | |
} | |
a { | |
color: #000; | |
text-decoration: none; | |
} | |
a:hover { | |
text-decoration: underline; | |
} | |
div.bordered { | |
margin: 0 auto; | |
margin-top: 100px; | |
width: 600px; | |
padding: 20px; | |
text-align: center; | |
border: 10px solid #ddd; | |
-webkit-border-radius: 20px; | |
} | |
#error { | |
background-color: #BA0000; | |
color: #fff; | |
font-weight: bold; | |
} | |
#ask { | |
font-size: 20pt; | |
} | |
#ask input { | |
font-size: 20pt; | |
padding: 10px; | |
margin: 0 10px; | |
} | |
#ask span.join { | |
padding: 10px; | |
background-color: #ddd; | |
-webkit-border-radius: 10px; | |
} | |
#channel { | |
margin-top: 100px; | |
height: 480px; | |
position: relative; | |
} | |
#channel div#descr { | |
position: absolute; | |
left: -10px; | |
top: -190px; | |
font-size: 13px; | |
text-align: left; | |
line-height: 20px; | |
padding: 5px; | |
width: 630px; | |
} | |
div#msgs { | |
overflow-y: scroll; | |
height: 400px; | |
} | |
div#msgs ul { | |
list-style: none; | |
padding: 0; | |
margin: 0; | |
text-align: left; | |
} | |
div#msgs li { | |
line-height: 20px; | |
} | |
div#msgs li span.user { | |
color: #ff9900; | |
} | |
div#msgs li span.user.self { | |
color: #aa2211; | |
} | |
div#msgs li span.time { | |
float: right; | |
margin-right: 5px; | |
color: #aaa; | |
font-family: "Courier New"; | |
font-size: 12px; | |
} | |
div#msgs li.control { | |
text-align: center; | |
} | |
div#msgs li.control span.message { | |
color: #aaa; | |
} | |
div#input { | |
text-align: left; | |
margin-top: 20px; | |
} | |
div#input #message { | |
width: 600px; | |
border: 5px solid #bbb; | |
-webkit-border-radius: 3px; | |
font-size: 30pt; | |
} | |
</style> | |
</head> | |
<body> | |
<a href="http://gist.github.com/348262"> | |
<img style="position: absolute; top: 0; right: 0; border: 0;" src="http://s3.amazonaws.com/github/ribbons/forkme_right_darkblue_121621.png" alt="Fork me on GitHub" /> | |
</a> | |
<div id="error" class="bordered" style="display: none;"> | |
This browser has no native WebSocket support.<br/> | |
Use a WebKit nightly or Google Chrome. | |
</div> | |
<div id="ask" class="bordered" style="display: none;"> | |
Name: <input type="text" id="name" /> <a href="#"><span class="join">Join!</span></a> | |
</div> | |
<div id="channel" class="bordered" style="display: none;"> | |
<div id="descr" class="bordered"> | |
<strong>Note:</strong> your messages make a round-trip up and down the stack (including Redis) | |
before being displayed here.<br/> | |
<strong>Tip:</strong> open up another browser window | |
to see how quickly your messages are distributed. | |
</div> | |
<div id="msgs"> | |
<ul> | |
<li class="message" style="display: none"> | |
<span class="user"></span><span class="message"></span> | |
<span class="time"></span> | |
</li> | |
<li class="control" style="display: none"> | |
<span class="user"></span> <span class="message"></span> | |
<span class="time"></span> | |
</li> | |
</ul> | |
</div> | |
<div id="input"> | |
<form><input type="text" id="message" /></form> | |
</div> | |
</div> | |
</body> | |
</html> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment