Created
April 5, 2012 06:43
-
-
Save elprup/2308510 to your computer and use it in GitHub Desktop.
Simple async callback using tornado and redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Simple async callback using tornado and redis | |
example: | |
from channel import * | |
import tornado.ioloop | |
import functools | |
def callback(x, prefix=''): | |
print prefix+str(x) | |
mycallback = functools.partial(callback, prefix='my:') | |
theircallback = functools.partial(callback, prefix='their:') | |
subscribe('papaya', mycallback) | |
subscribe('papaya', theircallback) | |
subscribe('mobile', theircallback) | |
publish('papaya', 'hello, all') | |
tornado.ioloop.IOLoop.instance().start() | |
''' | |
import socket | |
from tornado.ioloop import IOLoop | |
from tornado.iostream import IOStream | |
redis_host = '127.0.0.1' | |
redis_port = 6379 | |
# channel : callback | |
_callback_dict = {} | |
def _connect(host, port): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) | |
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | |
sock.connect((host,port)) | |
return sock | |
def _get_stream(host, port): | |
sock = _connect(host, port) | |
io_loop = IOLoop.instance() | |
return IOStream(sock, io_loop=io_loop) | |
_stream_publish = _get_stream(redis_host, redis_port) | |
def publish(channel, message): | |
# global _stream_publish | |
_stream_publish.write("rpush %s %s\r\n" % (channel, message)) | |
class Dispatcher(): | |
def __init__(self, channel): | |
self.channel = channel | |
self.callbacks = [] | |
self.stream = _get_stream(redis_host, redis_port) | |
self.stream.read_until('\r\n', self.dispatch) | |
self.stream.write('blpop %s 0\r\n' % self.channel) | |
# self.parser_status = 0 | |
self.parser_args = None | |
self.parser_argc = 0 | |
self.parser_length = 0 | |
def register(self, callback): | |
self.callbacks.append(callback) | |
def dispatch(self, message): | |
message = message[:-2] | |
head = message[0] | |
if head == '*': | |
self.parser_argc = int(message[1:]) | |
self.parser_args = [] | |
self.stream.read_until('\r\n', self.dispatch) | |
return | |
elif head == '$': | |
self.parser_length = message[1:] | |
self.stream.read_until('\r\n', self.dispatch) | |
return | |
else: | |
self.parser_args.append(message) | |
self.parser_argc -= 1 | |
if self.parser_argc != 0: | |
self.stream.read_until('\r\n', self.dispatch) | |
return | |
for f in self.callbacks: | |
f(self.parser_args) | |
self.stream.read_until('\r\n', self.dispatch) | |
self.stream.write('blpop %s 0\r\n' % self.channel) | |
def subscribe(channel, callback): | |
# global redis_host, redis_port | |
if not _callback_dict.has_key(channel): | |
_callback_dict[channel] = Dispatcher(channel) | |
_callback_dict[channel].register(callback) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment