Skip to content

Instantly share code, notes, and snippets.

@mouseroot
Created August 15, 2018 05:29
Show Gist options
  • Save mouseroot/7bb6c894ec3abc2b16638837cb9c1be5 to your computer and use it in GitHub Desktop.
Save mouseroot/7bb6c894ec3abc2b16638837cb9c1be5 to your computer and use it in GitHub Desktop.
Python 3.7 Asyncio Echo Server
import asyncio
class EchoProtocol(asyncio.Protocol):
def __init__(self):
self.transport = None
def connection_made(self, transport):
peer_name = transport.get_extra_info("peername")
print(f"Connection from {peer_name[0]}")
self.transport = transport
def data_received(self, data):
data = data.decode().replace("\n", "").encode()
self.transport.write(data)
loop = asyncio.get_event_loop()
coro = loop.create_server(EchoProtocol, "localhost", 8990)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except StopIteration:
loop.run_until_complete(server.wait_closed())
except KeyboardInterrupt:
loop.close()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment