Skip to content

Instantly share code, notes, and snippets.

@kiddouk
Created August 27, 2012 19:11
Show Gist options
  • Save kiddouk/3491408 to your computer and use it in GitHub Desktop.
Save kiddouk/3491408 to your computer and use it in GitHub Desktop.
Redis Async read/write in IOLoop
redis_parser = Reader()
redis_write_buffer = deque()
def create_redis_socket(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.connect((host, port))
sock.setblocking(0)
return sock
def handle_read(sock):
try:
data = sock.recv(4096)
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return None
else:
raise
if not data:
Config().get_logger().error("Redis Server : Socket closed on the other end")
sock.close()
return data
def redis_messages_handler(sock, fd, events):
if events & ioloop.IOLoop.READ:
data = handle_read(sock)
if data:
redis_parser.feed(data)
while True:
res = redis_parser.gets()
if res is False:
break
if res[0] == 'message':
channel = res[1]
for socket in SOCK_IO_STREAM[channel]:
try:
socket.send(res[2], force_json=True)
except Exception, e:
conf = Config().get_logger().error("Error while sending live message")
def write_to_redis(data):
from __main__ import async_redis_socket as sock
try:
sock.send(data)
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return False
else:
raise
return True
if __name__ == "__main__":
config = Config()
socket = SocketService()
tornadio2.server.SocketServer(socket, auto_start=False)
ioloop = tornado.ioloop.IOLoop.instance()
async_redis_socket = create_redis_socket(config.redis['host'], int(config.redis['port']))
ioloop.add_handler(async_redis_socket.fileno(), partial(redis_messages_handler, async_redis_socket), ioloop.READ)
ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment