Created
June 1, 2017 01:34
-
-
Save appeltel/09d77eb489494ae1e2703933108cb60a to your computer and use it in GitHub Desktop.
Simple asyncio streams redis client for GET/SET
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
def build_command(*args): | |
command = '*' + str(len(args)) + '\r\n' | |
for arg in args: | |
command += '$' + str(len(arg)) + '\r\n' | |
command += arg + '\r\n' | |
return command.encode('utf-8') | |
class RedisClient: | |
def __init__(self, host='127.0.0.1', port=6379): | |
self.host = host | |
self.port = port | |
self.reader = None | |
self.writer = None | |
async def connect(self): | |
self.reader, self.writer = await asyncio.open_connection( | |
self.host, | |
self.port | |
) | |
def disconnect(self): | |
self.writer.close() | |
async def __aenter__(self): | |
await self.connect() | |
return self | |
async def __aexit__(self, exc_type, exc, tb): | |
self.disconnect() | |
async def get(self, key): | |
self.writer.write(build_command('GET', key)) | |
return await self.get_string_response() | |
async def set(self, key, value): | |
self.writer.write(build_command('SET', key, value)) | |
return await self.get_string_response() | |
async def get_string_response(self): | |
ctrl_char = await self.reader.readexactly(1) | |
if ctrl_char == b'+': | |
resp = await self.reader.readuntil(separator=b'\r\n') | |
return resp[:-2].decode('utf-8') | |
elif ctrl_char == b'$': | |
resp_length = int(await self.reader.readuntil(separator=b'\r\n')) | |
if resp_length == -1: | |
return None | |
resp = await self.reader.readexactly(resp_length) | |
await self.reader.readexactly(2) | |
return resp.decode('utf-8') | |
else: | |
raise Exception('Restricted Protocol Error') | |
async def test_redis(): | |
async with RedisClient() as client: | |
result = await client.set("foo", "bar") | |
print(f'CMD "SET foo bar" result = {result}') | |
result = await client.get("foo") | |
print(f'CMD "GET foo" result = {result}') | |
result = await client.get("baz") | |
print(f'CMD "GET baz" result = {result}') | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test_redis()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment