Created
July 5, 2018 21:13
-
-
Save nonsleepr/43420a0ce659f1b870544fffb9e5cda4 to your computer and use it in GitHub Desktop.
Multiplexer for trio: my take on python-trio/trio#467
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from multiplexer import Multiplexer | |
async def reader(mx, key, timeout=100): | |
print(f'Waiting for "{key}"...') | |
try: | |
with trio.fail_after(timeout): | |
value = await mx[key] | |
print(f'Got value "{value}" for key {key}') | |
except trio.TooSlowError: | |
print(f"Don't want to wait any longer for {key}") | |
except Exception as e: | |
print(f'Ouch! {e}') | |
raise e | |
async def writer(mx): | |
for i in range(10): | |
await trio.sleep(0.2) | |
print(f'Writing {i}...') | |
mx[i] = i * 10 | |
async def runner(): | |
async with trio.open_nursery() as nursery: | |
mx = Multiplexer() | |
nursery.start_soon(reader, mx, 15, 5) | |
nursery.start_soon(reader, mx, 7) | |
nursery.start_soon(reader, mx, 3) | |
nursery.start_soon(writer, mx) | |
def test_multiplexer(): | |
trio.run(runner) | |
async def writer2(mx, errors): | |
for i in range(10): | |
await trio.sleep(0.2) | |
print(f'Writing {i}...') | |
if i in errors: | |
mx.set_exception(i, Exception('Ka-Boom!')) | |
else: | |
mx[i] = i * 10 | |
async def runner2(): | |
async with trio.open_nursery() as nursery: | |
mx = Multiplexer() | |
nursery.start_soon(reader, mx, 15, 5) | |
nursery.start_soon(reader, mx, 7) | |
nursery.start_soon(reader, mx, 3) | |
nursery.start_soon(writer2, mx, (7,9)) | |
def test_multiplexer_with_error(): | |
trio.run(runner2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import outcome | |
class Multiplexer(object): | |
def __init__(self): # TODO: Add capacity | |
self._data = {} | |
self._get_wait = {} | |
def get_nowait(self, key): | |
if key in self._data: | |
return self._data[key] | |
# TODO: Should we delete the value after? | |
raise trio.WouldBlock | |
async def __getitem__(self, key): | |
# TODO: Reseach why that's required? | |
await trio.hazmat.checkpoint_if_cancelled() | |
try: | |
value = self.get_nowait(key) | |
except trio.WouldBlock: | |
pass | |
else: | |
await trio.cancel_shielded_checkpoint() | |
return value | |
# Must wait | |
task = trio.hazmat.current_task() | |
def abort_fn(_): | |
self._get_wait.pop(key, None) | |
return trio.hazmat.Abort.SUCCEEDED | |
# TODO: Make sure there are no other waiters | |
self._get_wait[key] = task | |
try: | |
value = await trio.hazmat.wait_task_rescheduled(abort_fn) | |
finally: | |
self._get_wait.pop(key, None) | |
return value | |
def __setitem__(self, key, value): | |
self._data[key] = value | |
task = self._get_wait.get(key) | |
if task: | |
if isinstance(value, (outcome.Value, outcome.Error)): | |
trio.hazmat.reschedule(task, value) | |
else: | |
trio.hazmat.reschedule(task, outcome.Value(value)) | |
def set_exception(self, key, exc): | |
task = self._get_wait.get(key) | |
if task: | |
trio.hazmat.reschedule(task, outcome.Error(exc)) | |
def fail(self, exc): | |
for task in self._get_wait.values(): | |
trio.hazmat.reschedule(task, outcome.Error(exc)) | |
async def set(self, key, value): | |
await trio.hazmat.checkpoint_if_cancelled() | |
try: | |
self[key] = value | |
except trio.WouldBlock: | |
pass | |
else: | |
await trio.cancel_shielded_checkpoint() | |
return | |
raise Exception('async set is not fully implemented') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
if isinstance(value, outcome.Outcome):