Created
September 12, 2019 07:56
-
-
Save tkf/e6b3de67571162f8aac690c8b8f7ee7a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from typing import Any | |
import trio | |
async def start_cancelable(nursery, async_fn, *args): | |
async def launcher(task_status): | |
with trio.CancelScope() as scope: | |
task_status.started(scope) | |
await async_fn(*args) | |
return await nursery.start(launcher) | |
@dataclass | |
class Reduced: | |
value: Any | |
async def _reduce(scopes, op, init, xs): | |
if len(xs) > 2: | |
mid = len(xs) // 2 | |
xs1 = xs[:mid] | |
xs2 = xs[mid:] | |
acc2 = null = object() | |
async with trio.open_nursery() as nursery: | |
async def background(): | |
nonlocal acc2 | |
acc2 = await _reduce(scopes, op, init, xs2) | |
scope2 = await start_cancelable(nursery, background) | |
acc1 = await _reduce(scopes + [scope2], op, init, xs1) | |
if acc2 is null: | |
print("Canceled processing", xs2) | |
if isinstance(acc1, Reduced) or acc2 is null: | |
return acc1 | |
elif isinstance(acc2, Reduced): | |
return acc2 | |
else: | |
acc = await op(acc1, acc2) | |
else: | |
assert len(xs) in (1, 2) | |
acc = await op(init, xs[0]) | |
if len(xs) == 2 and not isinstance(acc, Reduced): | |
acc = await op(acc, xs[1]) | |
if isinstance(acc, Reduced): | |
print("Canceling", len(scopes), "scopes") | |
for s in scopes: | |
print("Cancel:", s) | |
s.cancel() | |
return acc | |
async def reduce(op, init, xs): | |
return await _reduce([], op, init, xs) | |
def sleepy(delay): | |
def g(f): | |
async def h(*args, **kwargs): | |
await trio.sleep(delay) | |
return f(*args, **kwargs) | |
return h | |
return g | |
def finder(needle): | |
def op(acc, x): | |
if x == needle: | |
return Reduced(x) | |
else: | |
return x | |
return op | |
def tracer(record): | |
def transducer(op): | |
def new_op(acc, x): | |
record.append((acc, x)) | |
return op(acc, x) | |
return new_op | |
return transducer | |
def applyfs(*args): | |
x = args[-1] | |
for f in args[-2::-1]: | |
x = f(x) | |
return x | |
async def recording_find(needle, delay, xs=range(2 ** 3)): | |
record = [] | |
op = applyfs( | |
# non-async-op -to- async-op transducer: | |
sleepy(delay), | |
# non-async-op -to- non-async-op transducer(s): | |
tracer(record), | |
# bottom step function: | |
finder(needle), | |
) | |
acc = await reduce(op, None, xs) | |
return (acc, record) | |
if __name__ == "__main__": | |
acc, record = trio.run(recording_find, 0, 0.1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment