Skip to content

Instantly share code, notes, and snippets.

@craigderington
Created May 4, 2021 23:15
Show Gist options
  • Save craigderington/8f6ce8c8e7d2de10273a3d412edf7588 to your computer and use it in GitHub Desktop.
Save craigderington/8f6ce8c8e7d2de10273a3d412edf7588 to your computer and use it in GitHub Desktop.
Asyncio Echo Server
#!/usr/bin/env python3.4
import asyncio
class EchoServer(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
print('data received: {}'.format(data.decode()))
fut = asyncio.async(self.sleeper())
result = asyncio.wait_for(fut, 60)
@asyncio.coroutine
def sleeper(self):
yield from asyncio.sleep(2)
self.transport.write("Hello World".encode())
self.transport.close()
loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
print('serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
print("exit")
finally:
server.close()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment