Created
May 14, 2017 13:51
-
-
Save Greyvend/ed05fd956c396f44d42360081e3b3299 to your computer and use it in GitHub Desktop.
Tornado TCP Server & Client with Redis connection. Refer to the corresponding repo for the full working example: https://github.com/Databrawl/real_time_tcp/tree/3e01d85e719bf793a4811b2d701609a9a4d36597
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor | |
from tornado import gen | |
from tornado.ioloop import IOLoop | |
from tornado.iostream import StreamClosedError | |
from tornado.tcpclient import TCPClient | |
class Client(TCPClient): | |
msg_separator = b'\r\n' | |
def __init__(self): | |
super(Client, self).__init__() | |
self._stream = None | |
self._executor = ThreadPoolExecutor(1) | |
@gen.coroutine | |
def run(self, host, port): | |
self._stream = yield self.connect(host, port) | |
yield [self.read(), self.write()] | |
@gen.coroutine | |
def read(self): | |
while True: | |
try: | |
data = yield self._stream.read_until(self.msg_separator) | |
body = data.rstrip(self.msg_separator) | |
print(body) | |
except StreamClosedError: | |
self.disconnect() | |
return | |
@gen.coroutine | |
def write(self): | |
while True: | |
try: | |
data = yield self._executor.submit(input) | |
encoded_data = data.encode('utf8') | |
encoded_data += self.msg_separator | |
yield self._stream.write(encoded_data) | |
except StreamClosedError: | |
self.disconnect() | |
return | |
def disconnect(self): | |
super(Client, self).close() | |
self._executor.shutdown(False) | |
if not self._stream.closed(): | |
print('Disconnecting...') | |
self._stream.close() | |
@gen.coroutine | |
def main(): | |
print('Connecting to the server socket...') | |
yield Client().run('localhost', 5567) | |
print('Disconnected from server socket.') | |
if __name__ == '__main__': | |
IOLoop.instance().run_sync(main) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import signal | |
import asyncio | |
from tornado import gen | |
from tornado.ioloop import IOLoop | |
from tornado.iostream import StreamClosedError | |
from tornado.tcpserver import TCPServer | |
from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future | |
import aioredis | |
class ClientConnection(object): | |
message_separator = b'\r\n' | |
def __init__(self, stream): | |
self._stream = stream | |
@gen.coroutine | |
def run(self): | |
while True: | |
try: | |
request = yield self._stream.read_until( | |
self.message_separator) | |
request_body = request.rstrip(self.message_separator) | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
else: | |
response_body = request_body | |
response = response_body + self.message_separator | |
try: | |
yield self._stream.write(response) | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
@gen.coroutine | |
def update(self, message): | |
response = message + self.message_separator | |
try: | |
yield self._stream.write(response) | |
except StreamClosedError: | |
self._stream.close(exc_info=True) | |
return | |
class Server(TCPServer): | |
def __init__(self, *args, **kwargs): | |
super(Server, self).__init__(*args, **kwargs) | |
self._redis = None | |
self._channel = None | |
self._connections = [] | |
@asyncio.coroutine | |
def subscribe(self, channel_name): | |
self._redis = yield aioredis.create_redis(('localhost', 6379)) | |
channels = yield self._redis.subscribe(channel_name) | |
print('Subscribed to "{}" Redis channel.'.format(channel_name)) | |
self._channel = channels[0] | |
yield self.listen_redis() | |
@gen.coroutine | |
def listen_redis(self): | |
while True: | |
yield self._channel.wait_message() | |
try: | |
msg = yield self._channel.get(encoding='utf-8') | |
except aioredis.errors.ChannelClosedError: | |
print("Redis channel was closed. Stopped listening.") | |
return | |
if msg: | |
body_utf8 = msg.encode('utf-8') | |
yield [con.update(body_utf8) for con in self._connections] | |
print("Message in {}: {}".format(self._channel.name, msg)) | |
@gen.coroutine | |
def handle_stream(self, stream, address): | |
connection = ClientConnection(stream) | |
self._connections.append(connection) | |
yield connection.run() | |
self._connections.remove(connection) | |
if __name__ == '__main__': | |
AsyncIOMainLoop().install() | |
server = Server() | |
server.listen(5567) | |
IOLoop.current().spawn_callback(server.subscribe, 'updates') | |
print('Starting the server...') | |
asyncio.get_event_loop().run_forever() | |
print('Server has shut down.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment