Created
March 15, 2015 12:36
-
-
Save ramalho/504e13381713e5e3505b to your computer and use it in GitHub Desktop.
Simple asyncio demo adapted from a multiprocessing demo by Michele Simionato
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # spinner_asyncio.py | |
| # credits: Example by Luciano Ramalho inspired by | |
| # Michele Simionato's multiprocessing example | |
| # source: | |
| # http://python-3-patterns-idioms-test.readthedocs.org/en/latest/CoroutinesAndConcurrency.html | |
| import sys | |
| import asyncio | |
| DELAY = 0.1 | |
| DISPLAY = '|/-\\' | |
| @asyncio.coroutine | |
| def spinner_func(before='', after=''): | |
| write, flush = sys.stdout.write, sys.stdout.flush | |
| while True: | |
| for char in DISPLAY: | |
| msg = '{} {} {}'.format(before, char, after) | |
| write(msg) | |
| flush() | |
| write('\x08' * len(msg)) | |
| try: | |
| yield from asyncio.sleep(DELAY) | |
| except asyncio.CancelledError: | |
| return | |
| @asyncio.coroutine | |
| def long_computation(delay): | |
| # emulate a long computation | |
| yield from asyncio.sleep(delay) | |
| if __name__ == '__main__': | |
| loop = asyncio.get_event_loop() | |
| spinner = loop.create_task(spinner_func('Please wait...', 'thinking!')) | |
| long_task = loop.create_task(long_computation(3)) | |
| long_task.add_done_callback(lambda f: spinner.cancel()) | |
| loop.run_until_complete(spinner) | |
| loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment