Forked from HQJaTu/gist:345f7147065f1e10587169dc36cc1edb
Last active
September 27, 2020 18:42
-
-
Save petri/46bb6b8eac8ea5b8f01c90e7414d7951 to your computer and use it in GitHub Desktop.
Python asyncio ordering test - single-connection multiplex version with asyncio.Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python | |
# Proof-of-Concept for https://stackoverflow.com/q/64017656/1548275 | |
# Do Python asyncio Streams maintain order over multiple writers and readers? | |
import sys | |
import argparse | |
import logging | |
import asyncio | |
from datetime import datetime | |
from pprint import pprint | |
from random import randrange | |
log = logging.getLogger(__name__) | |
DEFAULT_TCP_PORT = 8888 | |
queue = asyncio.Queue() | |
def _setup_logger(): | |
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") | |
console_handler = logging.StreamHandler(sys.stderr) | |
console_handler.setFormatter(log_formatter) | |
console_handler.propagate = False | |
logging.getLogger().addHandler(console_handler) | |
log.setLevel('DEBUG') | |
def run_server(loop, tcp_port): | |
log.info("Starting server in localhost TCP-port: %d" % tcp_port) | |
coro = asyncio.start_server(_server_coro, '127.0.0.1', tcp_port, loop=loop) | |
server = loop.run_until_complete(coro) | |
# Serve requests until Ctrl+C is pressed | |
socket_info = server.sockets[0].getsockname() | |
log.info("Serving on: %s:%d" % (socket_info[0], socket_info[1])) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
server.close() | |
async def _server_coro(reader, writer): | |
while True: | |
try: | |
data = await reader.readuntil(separator=b'END') | |
except asyncio.exceptions.IncompleteReadError: | |
return | |
message = data.decode() | |
addr = writer.get_extra_info('peername') | |
delay = randrange(3000) | |
log.debug("Received %s from %s" % (message, addr)) | |
message_out = "Got: '%s'" % message | |
await asyncio.sleep(delay / 1000) | |
log.debug("Sending after delay of %d ms: %s" % (delay, message)) | |
writer.write(message_out.encode('UTF-8')) | |
await writer.drain() | |
def run_client(loop, tcp_port, count_connections): | |
loop.run_until_complete(_client_coro(loop, tcp_port, count_connections)) | |
async def _responsereader_task(reader): | |
while not queue.empty(): | |
response = await queue.get() | |
log.debug("Reading network response for request from queue") | |
data = await reader.readuntil(separator=b"END") | |
response.set_result(data) | |
async def _client_coro(loop, tcp_port, count_connections): | |
reader, writer = await asyncio.open_connection('127.0.0.1', tcp_port, loop=loop) | |
tasks = [asyncio.create_task(_client_task(conn_idx, writer)) for conn_idx in range(count_connections)] | |
reader_task = asyncio.create_task(_responsereader_task(reader)) | |
tasks.insert(0, reader_task) | |
log.info("Running client to localhost TCP-port: %d" % tcp_port) | |
await asyncio.wait(tasks) | |
log.debug('Close the socket') | |
writer.close() | |
async def _client_task(conn_idx, writer): | |
idx = conn_idx + 1 | |
message = "Test %d END" % (idx) | |
log.debug('Task %d sending: %s' % (idx, message)) | |
writer.write(message.encode()) | |
response = asyncio.Future() | |
queue.put_nowait(response) | |
log.debug("Task %d queuing for response" % idx) | |
data = await response | |
log.debug('Task %d received: %s' % (idx, data.decode())) | |
def main(): | |
parser = argparse.ArgumentParser(description='Name.com DNS tool') | |
parser.add_argument('--server', action='store_true', | |
help='Run as a test server') | |
parser.add_argument('--client', action='store_true', | |
help='Run as a test client') | |
parser.add_argument('--port', '-p', | |
default=DEFAULT_TCP_PORT, type=int, | |
help="TCP-port for server to listen or client to connect. Default: %d" % DEFAULT_TCP_PORT) | |
parser.add_argument('--count', '-c', | |
default=1, type=int, | |
help="Number of client connections to make towards server. Default: 1") | |
args = parser.parse_args() | |
_setup_logger() | |
if not args.server and not args.client: | |
log.error("Need either --server or --client!") | |
exit(2) | |
# Init async I/O | |
async_loop = asyncio.get_event_loop() | |
if args.server: | |
run_server(async_loop, args.port) | |
elif args.client: | |
run_client(async_loop, args.port, args.count) | |
else: | |
raise ValueError("Internal: Duh?") | |
# In a nice and calm fashion, shut down any possible tasks that are pending. | |
for task in asyncio.Task.all_tasks(): | |
task.cancel() | |
async_loop.run_until_complete(async_loop.shutdown_asyncgens()) | |
async_loop.close() | |
log.info("Done.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment