Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created November 18, 2019 14:04
Show Gist options
  • Save vxgmichel/0158213f7e90a92732fab6b25814ecfe to your computer and use it in GitHub Desktop.
Save vxgmichel/0158213f7e90a92732fab6b25814ecfe to your computer and use it in GitHub Desktop.
Benchmark several `to_aiter` implementations
"""
Results:
1.77s call test_trio.py::test_to_aiter[run each next call in a separate thread]
1.29s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=inf)]
1.28s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=0)]
1.28s call test_trio.py::test_to_aiter[run iteration in a single thread (buffer_size=1)]
0.45s call test_trio.py::test_to_aiter[run the iteration and sleep(0) between each item]
0.02s call test_trio.py::test_to_aiter[run iteration in a single thread with batches]
0.01s call test_trio.py::test_to_aiter[run each batch iteration in a separate thread]
0.00s call test_trio.py::test_to_aiter[run the iteration in a thread and return a list]
0.00s call test_trio.py::test_to_aiter[run the iteration in a blocking way]
"""
import trio
import pytest
async def to_aiter_v0(fn, *args, **kwargs):
for item in fn(*args, **kwargs):
yield item
async def to_aiter_v1(fn, *args, **kwargs):
for item in fn(*args, **kwargs):
await trio.sleep(0)
yield item
async def to_aiter_v2(fn, *args, **kwargs):
def run():
return list(fn(*args, **kwargs))
result = await trio.to_thread.run_sync(run)
for item in result:
yield item
async def to_aiter_v3(fn, *args, **kwargs):
def instanciate():
return iter(fn(*args, **kwargs))
items_iter = await trio.to_thread.run_sync(instanciate)
stop_iteration = object()
def safe_next():
try:
return items_iter.__next__()
except StopIteration:
return stop_iteration
while True:
item = await trio.to_thread.run_sync(safe_next)
if item is stop_iteration:
return
yield item
async def to_aiter_v4(fn, *args, **kwargs):
send_channel, receive_channel = trio.open_memory_channel(0)
def run_in_thread():
try:
trio.from_thread.run(send_channel.__aenter__)
for item in fn(*args, **kwargs):
trio.from_thread.run(send_channel.send, item)
finally:
trio.from_thread.run(send_channel.__aexit__)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
async for item in receive_channel:
yield item
async def to_aiter_v5(fn, *args, **kwargs):
send_channel, receive_channel = trio.open_memory_channel(1)
def run_in_thread():
try:
trio.from_thread.run(send_channel.__aenter__)
for item in fn(*args, **kwargs):
trio.from_thread.run(send_channel.send, item)
finally:
trio.from_thread.run(send_channel.__aexit__)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
async for item in receive_channel:
yield item
async def to_aiter_v6(fn, *args, **kwargs):
inf = float("inf")
send_channel, receive_channel = trio.open_memory_channel(inf)
def run_in_thread():
try:
trio.from_thread.run(send_channel.__aenter__)
for item in fn(*args, **kwargs):
trio.from_thread.run(send_channel.send, item)
finally:
trio.from_thread.run(send_channel.__aexit__)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
async for item in receive_channel:
yield item
async def to_aiter_v7(fn, *args, **kwargs):
inf = float("inf")
send_channel, receive_channel = trio.open_memory_channel(inf)
def run_in_thread():
try:
trio.from_thread.run(send_channel.__aenter__)
batch = []
for item in fn(*args, **kwargs):
batch.append(item)
if len(batch) == 100:
trio.from_thread.run(send_channel.send, batch)
batch = []
except Exception:
trio.from_thread.run(send_channel.send, batch)
raise
else:
trio.from_thread.run(send_channel.send, batch)
finally:
trio.from_thread.run(send_channel.__aexit__)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.to_thread.run_sync, run_in_thread)
async for batch in receive_channel:
for item in batch:
yield item
async def to_aiter_v8(fn, *args, **kwargs):
import time
def instanciate():
return iter(fn(*args, **kwargs))
items_iter = await trio.to_thread.run_sync(instanciate)
def run_batch():
# Batch for 1 ms
batch = []
deadline = time.time() + 0.001
while time.time() < deadline:
try:
batch.append((next(items_iter), None))
except Exception as exc:
batch.append((None, exc))
break
return batch
while True:
batch = await trio.to_thread.run_sync(run_batch)
for result, exception in batch:
if isinstance(exception, StopIteration):
return
if exception is not None:
raise exception
yield result
@pytest.mark.trio
@pytest.mark.parametrize(
"version",
["v0", "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"],
ids=[
"run the iteration in a blocking way",
"run the iteration and sleep(0) between each item",
"run the iteration in a thread and return a list",
"run each next call in a separate thread",
"run iteration in a single thread (buffer_size=0)",
"run iteration in a single thread (buffer_size=1)",
"run iteration in a single thread (buffer_size=inf)",
"run iteration in a single thread with batches",
"run each batch iteration in a separate thread",
],
)
async def test_to_aiter(version):
to_aiter = globals()[f"to_aiter_{version}"]
expected = 0
async for x in to_aiter(range, 10 ** 4):
assert x == expected
expected += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment