Created
February 4, 2022 20:44
-
-
Save shamshirz/b6c57c9bdeb91cb899290b9919b07f0a to your computer and use it in GitHub Desktop.
asyncio practice
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
async def await_sleep(): | |
await asyncio.create_task(sleep()) | |
async def await_multiple(): | |
task1 = asyncio.create_task(sleep()) | |
task2 = asyncio.create_task(sleep()) | |
task3 = asyncio.create_task(sleep()) | |
task4 = asyncio.create_task(sleep()) | |
task5 = asyncio.create_task(sleep()) | |
task6 = asyncio.create_task(sleep()) | |
task7 = asyncio.create_task(sleep()) | |
await task1 | |
await task2 | |
await task3 | |
await task4 | |
await task5 | |
await task6 | |
await task7 | |
# This needs to be awaiting an asyncio sleep. time.sleep will not work | |
async def sleep(): | |
await asyncio.sleep(0.2) | |
return "ok" | |
def test_await(): | |
start = time.time() | |
# Runs and "awaitable" | |
# awaitable = async def that includes "await" | |
asyncio.run(await_sleep()) | |
end = time.time() | |
assert (end - start) < 1 | |
def test_await_concurrently(): | |
start = time.time() | |
# Runs and "awaitable" | |
# awaitable = async def that includes "await" | |
asyncio.run(await_multiple()) | |
end = time.time() | |
assert (end - start) < 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment