Created
January 28, 2024 22:00
-
-
Save Nikolaj-K/70461b139f12c02d6e3b09095fec9329 to your computer and use it in GitHub Desktop.
Basic async io example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script discussed in the video | |
https://youtu.be/RltoGfNhex4 | |
asyncio Enables (interleaved) sequences of functions execution within one thread. | |
Makes e.g. sense if you want less intensive computation results earlier (for IO) or | |
If some awaited computation is done in some external thread (e.g. info from a server). | |
""" | |
import asyncio | |
class Config: | |
EXPONENTS = [3, 5, 1, 4, 3, 2] # Note the random order | |
def pyramid_sum_function_analytical(exponent: int) -> int: | |
n = 10**exponent | |
return n * (n-1) / 2 | |
def pyramid_sum_function(exponent: int) -> int: | |
n = 10**exponent | |
s = 0 | |
for k in range(n): | |
s += k | |
assert s == pyramid_sum_function_analytical(exponent) | |
print(f"\t[pyramid_sum_function] The sum of number up to 10^{exponent} equals {s}.") | |
return s | |
def compute_sums_functions(): | |
sums_iter = map(pyramid_sum_function, Config.EXPONENTS) | |
# Evaluate and put return values in list | |
sums = list(sums_iter) | |
print("f[compute_sums_functions] finished with {sums}.\n") | |
return sums | |
async def bonus_sleeping_log(message, time): | |
await asyncio.sleep(time) | |
if message: | |
print(f"[bonus_sleeping_log] message: {message}\n") | |
async def pyramid_sum_coroutine(exponent: int) -> int: | |
# Here 'async' enables the funciton to be gathered into the event loop | |
n = 10**exponent | |
s = 0 | |
for k in range(n): | |
s += k | |
if k % 100==0: | |
# Make "productive waiting until picking it up again" call, which enables | |
# progression in the event loop of all gathered coroutines. | |
# (In this example, the coroutines are all instances of pyramid_sum_coroutine with different arguments.) | |
# Warning: Note that this switching between functions comes with some time overhead. | |
await bonus_sleeping_log("", 0) # Note: If this line is 'pass' instead, then no async happens | |
assert s == n*(n-1)/2 | |
print(f"\tThe sum of number up to 10^{exponent} equals {s}.") | |
return s | |
async def gather_coroutines(print_log): | |
# Put routines into list, evaluate them | |
# (iff event loop is ran) and put return values in list | |
# 1. | |
aws_iter = map(pyramid_sum_coroutine, Config.EXPONENTS) | |
if print_log: | |
# Auxiliary log | |
# Note: Only blocking if being awaited. With time=3, this will never happened if not awaited | |
t_hello = asyncio.create_task(bonus_sleeping_log("HELLO", 3)) | |
print(f"\n[gather_coroutines] t_hello = {t_hello}\n") # <Task pending coro=...> | |
# 2. | |
awaitable_sums = asyncio.gather(*aws_iter) # Task with coroutines put concurrently | |
if print_log: | |
print(f"[gather_coroutines] awaitable_sums = {awaitable_sums}\n") # <_GatheringFuture pending> | |
# 3. | |
sums = await awaitable_sums | |
if print_log: | |
print(f"\n[gather_coroutines] awaitable_sums = {awaitable_sums}\n") # <_GatheringFuture finished result=...> | |
# Auxiliary log | |
# Note: Log will never happen if not awaited | |
t_goodbye = asyncio.create_task(bonus_sleeping_log("GOODBYE", 2)) | |
print(f"[gather_coroutines] t_goodbye = {t_goodbye}\n") # <Task pending coro=...> | |
await t_goodbye | |
print(f"[gather_coroutines] t_goodbye = {t_goodbye}\n") # <Task finished coro=...> | |
return sums | |
async def wait_coroutines(): | |
RETURN_WHEN = asyncio.FIRST_COMPLETED # = asyncio.ALL_COMPLETED also possible | |
aws_iter = map(pyramid_sum_coroutine, Config.EXPONENTS) | |
_done, _pending = await asyncio.wait(list(aws_iter), return_when=RETURN_WHEN) # Using .waits instead of .gather. Lower level. | |
def legacy_run(coro): | |
event_loop = asyncio.get_event_loop() | |
return event_loop.run_until_complete(coro) | |
if __name__=="__main__": | |
print(80 * "-" + f"\n[main] Run sequential compute_sums_functions") | |
sums = compute_sums_functions() | |
USE_LEGACY_RUN = True | |
print(80 * "-" + f"\n[main] Run gather_coroutines (with USE_LEGACY_RUN={USE_LEGACY_RUN})") | |
async_sums = gather_coroutines(print_log=True) | |
res_sums = legacy_run(async_sums) if USE_LEGACY_RUN else asyncio.run(async_sums) | |
assert res_sums == sums | |
print(80 * "-" + f"\n[main] Run wait_coroutines") | |
asyncio.run(wait_coroutines()) # Alternative. Note: Here .run does not return the list of ints. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment