Skip to content

Instantly share code, notes, and snippets.

@FSX
Created March 15, 2012 14:55
Show Gist options
  • Save FSX/2044621 to your computer and use it in GitHub Desktop.
Save FSX/2044621 to your computer and use it in GitHub Desktop.
Tornado + Redis demo.
import socket
from itertools import imap
from functools import partial
from collections import deque
from tornado import ioloop
from tornado import iostream
import hiredis
class Connection(object):
def __init__(self, host='localhost', port=6379,
encoding='utf-8', encoding_errors='strict'):
self.host = host
self.port = port
self.encoding = encoding
self.encoding_errors = encoding_errors
self._deque = deque()
self._reader = hiredis.Reader()
self._ioloop = ioloop.IOLoop.instance()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self._stream = iostream.IOStream(s)
self._stream.connect((host, port))
def encode(self, value):
"Return a bytestring representation of the value."
if isinstance(value, unicode):
return value.encode(self.encoding, self.encoding_errors)
return str(value)
def pack_command(self, *args):
"Pack a series of arguments into a value Redis command."
command = ['$%s\r\n%s\r\n' % (len(enc_value), enc_value)
for enc_value in imap(self.encode, args)]
return '*%s\r\n%s' % (len(command), ''.join(command))
def send_command(self, callback, *args):
self._stream.write(self.pack_command(*args))
self._deque.appendleft(callback)
if not self._stream.reading():
self._stream.read_until('\r\n', self._handle_read)
def _handle_read(self, data):
self._reader.feed(data)
r = self._reader.gets()
while r is not False:
if self._deque:
callback = self._deque.pop()
callback(r)
r = self._reader.gets()
if self._deque:
if self._stream.reading():
self._ioloop.add_callback(lambda: self._stream.read_until('\r\n', self._handle_read))
else:
self._stream.read_until('\r\n', self._handle_read)
import time
from tornado import ioloop
from umeko import Connection
G_START_TIME = time.clock()
G_DATA = -1
def done_1(data):
# print '1 - ', data
assert data == 'OK'
def done_2(data):
global G_DATA, G_START_TIME
data = int(data)
# print '2 - ', data, G_DATA
assert data > G_DATA
G_DATA = data
if data == 999:
print 'Duration: %gs' % (time.clock() - G_START_TIME)
ioloop.IOLoop.instance().stop()
def start():
c = Connection('localhost', 6379)
for i in xrange(0, 1000):
c.send_command(done_1, 'SET', 'key_%d' % i, i)
c.send_command(done_2, 'GET', 'key_%d' % i)
if __name__ == '__main__':
try:
start()
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
print 'Exit'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment