Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save petri/46bb6b8eac8ea5b8f01c90e7414d7951 to your computer and use it in GitHub Desktop.
Save petri/46bb6b8eac8ea5b8f01c90e7414d7951 to your computer and use it in GitHub Desktop.
Python asyncio ordering test - single-connection multiplex version with asyncio.Queue
#!/usr/bin/env python3
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python
# Proof-of-Concept for https://stackoverflow.com/q/64017656/1548275
# Do Python asyncio Streams maintain order over multiple writers and readers?
import sys
import argparse
import logging
import asyncio
from datetime import datetime
from pprint import pprint
from random import randrange
log = logging.getLogger(__name__)
DEFAULT_TCP_PORT = 8888
queue = asyncio.Queue()
def _setup_logger():
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(log_formatter)
console_handler.propagate = False
logging.getLogger().addHandler(console_handler)
log.setLevel('DEBUG')
def run_server(loop, tcp_port):
log.info("Starting server in localhost TCP-port: %d" % tcp_port)
coro = asyncio.start_server(_server_coro, '127.0.0.1', tcp_port, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
socket_info = server.sockets[0].getsockname()
log.info("Serving on: %s:%d" % (socket_info[0], socket_info[1]))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
async def _server_coro(reader, writer):
while True:
try:
data = await reader.readuntil(separator=b'END')
except asyncio.exceptions.IncompleteReadError:
return
message = data.decode()
addr = writer.get_extra_info('peername')
delay = randrange(3000)
log.debug("Received %s from %s" % (message, addr))
message_out = "Got: '%s'" % message
await asyncio.sleep(delay / 1000)
log.debug("Sending after delay of %d ms: %s" % (delay, message))
writer.write(message_out.encode('UTF-8'))
await writer.drain()
def run_client(loop, tcp_port, count_connections):
loop.run_until_complete(_client_coro(loop, tcp_port, count_connections))
async def _responsereader_task(reader):
while not queue.empty():
response = await queue.get()
log.debug("Reading network response for request from queue")
data = await reader.readuntil(separator=b"END")
response.set_result(data)
async def _client_coro(loop, tcp_port, count_connections):
reader, writer = await asyncio.open_connection('127.0.0.1', tcp_port, loop=loop)
tasks = [asyncio.create_task(_client_task(conn_idx, writer)) for conn_idx in range(count_connections)]
reader_task = asyncio.create_task(_responsereader_task(reader))
tasks.insert(0, reader_task)
log.info("Running client to localhost TCP-port: %d" % tcp_port)
await asyncio.wait(tasks)
log.debug('Close the socket')
writer.close()
async def _client_task(conn_idx, writer):
idx = conn_idx + 1
message = "Test %d END" % (idx)
log.debug('Task %d sending: %s' % (idx, message))
writer.write(message.encode())
response = asyncio.Future()
queue.put_nowait(response)
log.debug("Task %d queuing for response" % idx)
data = await response
log.debug('Task %d received: %s' % (idx, data.decode()))
def main():
parser = argparse.ArgumentParser(description='Name.com DNS tool')
parser.add_argument('--server', action='store_true',
help='Run as a test server')
parser.add_argument('--client', action='store_true',
help='Run as a test client')
parser.add_argument('--port', '-p',
default=DEFAULT_TCP_PORT, type=int,
help="TCP-port for server to listen or client to connect. Default: %d" % DEFAULT_TCP_PORT)
parser.add_argument('--count', '-c',
default=1, type=int,
help="Number of client connections to make towards server. Default: 1")
args = parser.parse_args()
_setup_logger()
if not args.server and not args.client:
log.error("Need either --server or --client!")
exit(2)
# Init async I/O
async_loop = asyncio.get_event_loop()
if args.server:
run_server(async_loop, args.port)
elif args.client:
run_client(async_loop, args.port, args.count)
else:
raise ValueError("Internal: Duh?")
# In a nice and calm fashion, shut down any possible tasks that are pending.
for task in asyncio.Task.all_tasks():
task.cancel()
async_loop.run_until_complete(async_loop.shutdown_asyncgens())
async_loop.close()
log.info("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment