Created
May 30, 2017 06:47
-
-
Save Greyvend/e21b87a0f376b4d15c43fc4b7d5fa31a to your computer and use it in GitHub Desktop.
Tornado TCP Server & Client with Redis connection and simple subscription protocol. Refer to the corresponding repo for the full working example: https://github.com/Databrawl/real_time_tcp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import signal | |
import asyncio | |
from tornado import gen | |
from tornado.ioloop import IOLoop | |
from tornado.iostream import StreamClosedError | |
from tornado.tcpserver import TCPServer | |
from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future | |
import aioredis | |
class ClientConnection(object): | |
message_separator = b'\r\n' | |
def __init__(self, stream): | |
self._stream = stream | |
self._subscribed = False | |
def _handle_request(self, request): | |
if request == 'SUBSCRIBE': | |
if not self._subscribed: | |
self._subscribed = True | |
return 'CONFIRMED' | |
else: | |
return 'ALREADY SUBSCRIBED' | |
elif request == 'UNSUBSCRIBE': | |
if not self._subscribed: | |
return 'ALREADY UNSUBSCRIBED' | |
else: | |
self._subscribed = False | |
return 'CONFIRMED' | |
else: | |
return 'UNKNOWN COMMAND' | |
@gen.coroutine | |
def run(self): | |
while True: | |
try: | |
request = yield self._stream.read_until( | |
self.message_separator) | |
request_body = request.rstrip(self.message_separator) | |
request_body_str = request_body.decode('utf-8') | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
else: | |
response_body = self._handle_request(request_body_str) | |
response_body_bytes = response_body.encode('utf-8') | |
response = response_body_bytes + self.message_separator | |
try: | |
yield self._stream.write(response) | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
@gen.coroutine | |
def update(self, message): | |
if not self._subscribed: | |
return | |
response = message + self.message_separator | |
try: | |
yield self._stream.write(response) | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
class Server(TCPServer): | |
def __init__(self, *args, **kwargs): | |
super(Server, self).__init__(*args, **kwargs) | |
self._redis = None | |
self._channel = None | |
self._connections = [] | |
@asyncio.coroutine | |
def subscribe(self, channel_name): | |
self._redis = yield aioredis.create_redis(('localhost', 6379)) | |
channels = yield self._redis.subscribe(channel_name) | |
print('Subscribed to "{}" Redis channel.'.format(channel_name)) | |
self._channel = channels[0] | |
yield self.listen_redis() | |
@gen.coroutine | |
def listen_redis(self): | |
while True: | |
yield self._channel.wait_message() | |
try: | |
msg = yield self._channel.get(encoding='utf-8') | |
except aioredis.errors.ChannelClosedError: | |
print("Redis channel was closed. Stopped listening.") | |
return | |
if msg: | |
body_utf8 = msg.encode('utf-8') | |
yield [con.update(body_utf8) for con in self._connections] | |
print("Message in {}: {}".format(self._channel.name, msg)) | |
@gen.coroutine | |
def handle_stream(self, stream, address): | |
print('New request has come from our {} buddy...'.format(address)) | |
connection = ClientConnection(stream) | |
self._connections.append(connection) | |
yield connection.run() | |
self._connections.remove(connection) | |
if __name__ == '__main__': | |
AsyncIOMainLoop().install() | |
server = Server() | |
server.listen(5567) | |
IOLoop.current().spawn_callback(server.subscribe, 'updates') | |
print('Starting the server...') | |
asyncio.get_event_loop().run_forever() | |
print('Server has shut down.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment