Created
November 18, 2019 14:04
-
-
Save vxgmichel/0158213f7e90a92732fab6b25814ecfe to your computer and use it in GitHub Desktop.
Benchmark several `to_aiter` implementations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Results: | |
1.77s call test_trio.py::test_to_aiter[run each next call in a separate thread] | |
1.29s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=inf)] | |
1.28s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=0)] | |
1.28s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=1)] | |
0.45s call test_trio.py::test_to_aiter[run the iteration and sleep(0) between each item] | |
0.02s call test_trio.py::test_to_aiter[run iteration in a single thread with batches] | |
0.01s call test_trio.py::test_to_aiter[run each batch iteration in a separate thread] | |
0.00s call test_trio.py::test_to_aiter[run the iteration in a thread and return a list] | |
0.00s call test_trio.py::test_to_aiter[run the iteration in a blocking way] | |
""" | |
import trio | |
import pytest | |
async def to_aiter_v0(fn, *args, **kwargs): | |
for item in fn(*args, **kwargs): | |
yield item | |
async def to_aiter_v1(fn, *args, **kwargs): | |
for item in fn(*args, **kwargs): | |
await trio.sleep(0) | |
yield item | |
async def to_aiter_v2(fn, *args, **kwargs): | |
def run(): | |
return list(fn(*args, **kwargs)) | |
result = await trio.to_thread.run_sync(run) | |
for item in result: | |
yield item | |
async def to_aiter_v3(fn, *args, **kwargs): | |
def instanciate(): | |
return iter(fn(*args, **kwargs)) | |
items_iter = await trio.to_thread.run_sync(instanciate) | |
stop_iteration = object() | |
def safe_next(): | |
try: | |
return items_iter.__next__() | |
except StopIteration: | |
return stop_iteration | |
while True: | |
item = await trio.to_thread.run_sync(safe_next) | |
if item is stop_iteration: | |
return | |
yield item | |
async def to_aiter_v4(fn, *args, **kwargs): | |
send_channel, receive_channel = trio.open_memory_channel(0) | |
def run_in_thread(): | |
try: | |
trio.from_thread.run(send_channel.__aenter__) | |
for item in fn(*args, **kwargs): | |
trio.from_thread.run(send_channel.send, item) | |
finally: | |
trio.from_thread.run(send_channel.__aexit__) | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(trio.to_thread.run_sync, run_in_thread) | |
async for item in receive_channel: | |
yield item | |
async def to_aiter_v5(fn, *args, **kwargs): | |
send_channel, receive_channel = trio.open_memory_channel(1) | |
def run_in_thread(): | |
try: | |
trio.from_thread.run(send_channel.__aenter__) | |
for item in fn(*args, **kwargs): | |
trio.from_thread.run(send_channel.send, item) | |
finally: | |
trio.from_thread.run(send_channel.__aexit__) | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(trio.to_thread.run_sync, run_in_thread) | |
async for item in receive_channel: | |
yield item | |
async def to_aiter_v6(fn, *args, **kwargs): | |
inf = float("inf") | |
send_channel, receive_channel = trio.open_memory_channel(inf) | |
def run_in_thread(): | |
try: | |
trio.from_thread.run(send_channel.__aenter__) | |
for item in fn(*args, **kwargs): | |
trio.from_thread.run(send_channel.send, item) | |
finally: | |
trio.from_thread.run(send_channel.__aexit__) | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(trio.to_thread.run_sync, run_in_thread) | |
async for item in receive_channel: | |
yield item | |
async def to_aiter_v7(fn, *args, **kwargs): | |
inf = float("inf") | |
send_channel, receive_channel = trio.open_memory_channel(inf) | |
def run_in_thread(): | |
try: | |
trio.from_thread.run(send_channel.__aenter__) | |
batch = [] | |
for item in fn(*args, **kwargs): | |
batch.append(item) | |
if len(batch) == 100: | |
trio.from_thread.run(send_channel.send, batch) | |
batch = [] | |
except Exception: | |
trio.from_thread.run(send_channel.send, batch) | |
raise | |
else: | |
trio.from_thread.run(send_channel.send, batch) | |
finally: | |
trio.from_thread.run(send_channel.__aexit__) | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(trio.to_thread.run_sync, run_in_thread) | |
async for batch in receive_channel: | |
for item in batch: | |
yield item | |
async def to_aiter_v8(fn, *args, **kwargs): | |
import time | |
def instanciate(): | |
return iter(fn(*args, **kwargs)) | |
items_iter = await trio.to_thread.run_sync(instanciate) | |
def run_batch(): | |
# Batch for 1 ms | |
batch = [] | |
deadline = time.time() + 0.001 | |
while time.time() < deadline: | |
try: | |
batch.append((next(items_iter), None)) | |
except Exception as exc: | |
batch.append((None, exc)) | |
break | |
return batch | |
while True: | |
batch = await trio.to_thread.run_sync(run_batch) | |
for result, exception in batch: | |
if isinstance(exception, StopIteration): | |
return | |
if exception is not None: | |
raise exception | |
yield result | |
@pytest.mark.trio | |
@pytest.mark.parametrize( | |
"version", | |
["v0", "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"], | |
ids=[ | |
"run the iteration in a blocking way", | |
"run the iteration and sleep(0) between each item", | |
"run the iteration in a thread and return a list", | |
"run each next call in a separate thread", | |
"run iteration in a single thread (buffer_size=0)", | |
"run iteration in a single thread (buffer_size=1)", | |
"run iteration in a single thread (buffer_size=inf)", | |
"run iteration in a single thread with batches", | |
"run each batch iteration in a separate thread", | |
], | |
) | |
async def test_to_aiter(version): | |
to_aiter = globals()[f"to_aiter_{version}"] | |
expected = 0 | |
async for x in to_aiter(range, 10 ** 4): | |
assert x == expected | |
expected += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment