Created
September 23, 2020 07:34
-
-
Save HQJaTu/345f7147065f1e10587169dc36cc1edb to your computer and use it in GitHub Desktop.
Python asyncio ordering test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python | |
# Proof-of-Concept for https://stackoverflow.com/q/64017656/1548275 | |
# Do Python asyncio Streams maintain order over multiple writers and readers? | |
import sys | |
import argparse | |
import logging | |
import asyncio | |
from datetime import datetime | |
from pprint import pprint | |
from random import randrange | |
log = logging.getLogger(__name__) | |
DEFAULT_TCP_PORT = 8888 | |
def _setup_logger(): | |
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") | |
console_handler = logging.StreamHandler(sys.stderr) | |
console_handler.setFormatter(log_formatter) | |
console_handler.propagate = False | |
logging.getLogger().addHandler(console_handler) | |
log.setLevel('DEBUG') | |
def run_server(loop, tcp_port): | |
log.info("Starting server in localhost TCP-port: %d" % tcp_port) | |
coro = asyncio.start_server(_server_coro, '127.0.0.1', tcp_port, loop=loop) | |
server = loop.run_until_complete(coro) | |
# Serve requests until Ctrl+C is pressed | |
socket_info = server.sockets[0].getsockname() | |
log.info("Serving on: %s:%d" % (socket_info[0], socket_info[1])) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
server.close() | |
async def _server_coro(reader, writer): | |
data = await reader.read(100) | |
message = data.decode() | |
addr = writer.get_extra_info('peername') | |
delay = randrange(3000) | |
log.debug("Received %s from %s" % (message, addr)) | |
message_out = "Got: '%s'" % message | |
await asyncio.sleep(delay / 1000) | |
log.debug("Sending after delay of %d ms: %s" % (delay, message)) | |
writer.write(message_out.encode('UTF-8')) | |
await writer.drain() | |
log.debug("Close the client socket") | |
writer.close() | |
def run_client(loop, tcp_port, count_connections): | |
loop.run_until_complete(_client_coro(loop, tcp_port, count_connections)) | |
async def _client_coro(loop, tcp_port, count_connections): | |
tasks = [asyncio.create_task(_client_task(loop, tcp_port, conn_idx)) for conn_idx in range(count_connections)] | |
log.info("Running client to localhost TCP-port: %d" % tcp_port) | |
await asyncio.wait(tasks) | |
async def _client_task(loop, tcp_port, conn_idx): | |
message = "Test %d" % (conn_idx + 1) | |
reader, writer = await asyncio.open_connection('127.0.0.1', tcp_port, loop=loop) | |
log.debug('Send: %s' % message) | |
writer.write(message.encode()) | |
data = await reader.read(100) | |
log.debug('Received: %s' % data.decode()) | |
log.debug('Close the socket') | |
writer.close() | |
def main(): | |
parser = argparse.ArgumentParser(description='Name.com DNS tool') | |
parser.add_argument('--server', action='store_true', | |
help='Run as a test server') | |
parser.add_argument('--client', action='store_true', | |
help='Run as a test client') | |
parser.add_argument('--port', '-p', | |
default=DEFAULT_TCP_PORT, type=int, | |
help="TCP-port for server to listen or client to connect. Default: %d" % DEFAULT_TCP_PORT) | |
parser.add_argument('--count', '-c', | |
default=1, type=int, | |
help="Number of client connections to make towards server. Default: 1") | |
args = parser.parse_args() | |
_setup_logger() | |
if not args.server and not args.client: | |
log.error("Need either --server or --client!") | |
exit(2) | |
# Init async I/O | |
async_loop = asyncio.get_event_loop() | |
if args.server: | |
run_server(async_loop, args.port) | |
elif args.client: | |
run_client(async_loop, args.port, args.count) | |
else: | |
raise ValueError("Internal: Duh?") | |
# In a nice and calm fashion, shut down any possible tasks that are pending. | |
for task in asyncio.Task.all_tasks(): | |
task.cancel() | |
async_loop.run_until_complete(async_loop.shutdown_asyncgens()) | |
async_loop.close() | |
log.info("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment