Created
July 26, 2019 14:37
-
-
Save paretech/5eb6681efaa8dae35fcebe27a79fb738 to your computer and use it in GitHub Desktop.
Python AsyncIO Example #1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import random | |
import sys | |
LOGGER = logging.getLogger(__name__) | |
LOGGER.addHandler(logging.NullHandler()) | |
def config_logging(): | |
logging.basicConfig( | |
format="%(asctime)s %(levelname)s:%(name)s: %(message)s", | |
level=logging.DEBUG, | |
datefmt="%H:%M:%S", | |
stream=sys.stderr, | |
) | |
def bug(msg='Hi, this is Bug!'): | |
LOGGER.warning(msg) | |
raise Exception(msg) | |
async def heartbeat(): | |
# Ensure something running on loop https://bugs.python.org/issue23057 | |
while True: | |
LOGGER.info('heartbeat...') | |
await asyncio.sleep(1) | |
async def worker(q, ident=None): | |
LOGGER.info(f'Worker #{ident} - Starting') | |
try: | |
while True: | |
present_page = await q.get() | |
LOGGER.info(f'Worker #{ident} - Servicing Page {present_page}') | |
# Produce more tasks | |
if present_page is 0: | |
for new_page in range(1, 10): | |
LOGGER.info(f'Worker #{ident} - Adding Page {new_page} to the queue') | |
await q.put(new_page) | |
q.task_done() | |
# If bug before task_done() need different strategy | |
if present_page is 5: bug() | |
# Sleep is good for examples, otherwise first worker may appear | |
# to be doing all the work. Not exactly picture of concurrency... | |
await asyncio.sleep(random.uniform(0.75, 1.0)) | |
except asyncio.CancelledError: | |
LOGGER.warning(f'Worker #{ident} - Cancelled') | |
except Exception as e: | |
LOGGER.error(f'Worker #{ident} - Raised exception: {e!r}') | |
finally: | |
LOGGER.info(f'Worker #{ident} - Stopped') | |
async def main(): | |
q = asyncio.queues.Queue() | |
# Place first job on the queue | |
await q.put(0) | |
# Schedule heartbeats | |
asyncio.create_task(heartbeat()) | |
# Schedule some workes | |
task_pool = [asyncio.create_task(worker(q, i)) for i in range(5)] | |
LOGGER.info('Finished creating tasks, waiting on queue...') | |
# Join will block until count of unfinished tasks is zero (all queues). | |
# Exception handling more challenging when using join. | |
await q.join() | |
LOGGER.info('Queue stopped blocking, on with the show!') | |
# Don't forget to cancel running tasks after queue is empty or will | |
# see asyncio errors for "Task was destroyed but it is pending!" | |
asyncio.gather(*task_pool).cancel() | |
if __name__ == '__main__': | |
config_logging() | |
loop = asyncio.get_event_loop() | |
loop.set_debug(True) | |
try: | |
loop.run_until_complete(main()) | |
except KeyboardInterrupt: | |
LOGGER.info("User aborted") | |
finally: | |
LOGGER.info("That's All Folks!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment