Skip to content

Instantly share code, notes, and snippets.

@SupermanScott
Created January 22, 2012 19:33
Show Gist options
  • Save SupermanScott/1658402 to your computer and use it in GitHub Desktop.
Save SupermanScott/1658402 to your computer and use it in GitHub Desktop.
Tornado Redis PubSub handler
class TornadoPubSub(redis.client.PubSub):
"""
PubSub handler that uses the IOLoop from tornado to read published messages
"""
_stream = None
def listen(self):
"""
Listen for messages by telling IOLoop what to call when data is there.
"""
if not self._stream:
socket = self.connection._sock
self._stream = tornado.iostream.IOStream(socket)
self._stream.read_until('\r\n', self.process_response)
def process_response(self, data):
"""
Called by IOLoop when data is read.
"""
# Pretty fragile here. @TODO: figure out how to turn socket into
# blocking until the mulk-bulk message is completely read.
if data[0] == 'm':
self._stream.read_bytes(2, lambda x: self._stream.read_until('}\r\n', self.read_json_message))
else:
self._stream.read_until('\r\n', self.process_response)
def read_json_message(self, data):
"""
Reads redis protocol and gets the json message
"""
message = json.loads(data.split('\n')[-2])
for listener in listeners:
listener.emit(message)
self._stream.read_until('\r\n', self.process_response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment