Skip to content

Instantly share code, notes, and snippets.

@gs-niteesh
Created May 4, 2021 18:49
Show Gist options
  • Save gs-niteesh/86b57d5cca39c00484e94d844b1c73c6 to your computer and use it in GitHub Desktop.
Save gs-niteesh/86b57d5cca39c00484e94d844b1c73c6 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import sys
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('server')
class Session:
def __init__(self, name, reader, writer):
self._name = name
self._reader = reader
self._writer = writer
@property
def name(self):
return self._name
@property
def writer(self):
return self._writer
@property
def reader(self):
return self._reader
class Server:
def __init__(self):
self._sessions = {}
self._disconnected_sessions = None
self.loop = asyncio.get_event_loop()
self._disconnected_sessions = asyncio.Queue()
async def _disconnect(self):
while True:
task, session = await self._disconnected_sessions.get()
name = task.get_name()
del self._sessions[name]
task.cancel()
try:
await session.writer.drain()
session.writer.close()
await task
except asyncio.CancelledException:
log.debug('Cancelled exception received for task %s', name)
except Exception:
log.debug('Exception occured while awaiting task %s', name)
finally:
log.info('Successfully disconnected %s', name)
async def _start(self):
server = await asyncio.start_server(self._on_connect, '', 3333)
self.loop.create_task(self._disconnect(), name='_disconnect')
log.info('Starting server')
async with server:
await server.serve_forever()
async def _on_close(self):
for task in asyncio.all_tasks():
name = task.get_name()
log.info('Shutting down task: %s', name)
task.cancel()
try:
await task
except asyncio.CancelledError:
log.debug('Task %s cancelled properly', name)
except Exception as e:
log.debug('Task %s unhandled exception %s', name, e)
async def send_to_all(self, cur_session, msg):
for (task, session) in self._sessions.values():
if session != cur_session:
log.info('Sending msg to %s', task.get_name())
session.writer.write(msg.encode())
await session.writer.drain()
async def _on_connect(self, reader, writer):
name = str(writer.get_extra_info('peername'))
log.info(f'{name} connected')
session = Session(name, reader, writer)
task = self.loop.create_task(self.handle_session(session), name=name)
self._sessions[name] = (task, session)
async def handle_session(self, session):
name = str(session.writer.get_extra_info('peername'))
while True:
data = await session._reader.readline()
if not data:
log.info(f'{name} disconnected')
self._disconnected_sessions.put_nowait(self._sessions[name])
return
data = data.decode().strip()
log.info('Received %s from %s', data, name)
await self.send_to_all(self._sessions[name][1], data)
def start(self, debug=False):
self.loop.set_debug(debug)
self.loop.create_task(self._start(), name='_start')
try:
self.loop.run_forever()
except KeyboardInterrupt:
log.info('Keyboard Interrupt')
self.loop.run_until_complete(self._on_close())
except:
log.info('Unhandled exception')
if __name__ == '__main__':
debug = False
if len(sys.argv) > 2:
log.info('Usage: python3.8 server.py [debug]')
sys.exit(0)
if len(sys.argv) == 2 and sys.argv[1] == 'debug':
debug = True
Server().start(debug)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment