Last active
February 4, 2020 14:51
-
-
Save decentral1se/ee7c7607df3cbcde8a03073dbbc5cc91 to your computer and use it in GitHub Desktop.
trio_exc.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
import trio | |
from trio.testing import memory_stream_pair | |
from muxdemux import open_protocol | |
@pytest.fixture() | |
def stream(): | |
return memory_stream_pair() | |
@pytest.fixture() | |
def lstream(stream): | |
return stream[0] | |
@pytest.fixture() | |
def rstream(stream): | |
yield stream[1] | |
@pytest.fixture() | |
async def lproto(lstream): | |
async with trio.open_nursery() as n: | |
async with open_protocol(1, lstream, n) as p: | |
yield p | |
@pytest.fixture() | |
async def rproto(rstream): | |
async with trio.open_nursery() as n: | |
async with open_protocol(2, rstream, n) as p: | |
yield p | |
async def test_proto_async_usage(lstream, rstream): | |
async with trio.open_nursery() as n: | |
async with open_protocol(1, lstream, n): | |
pass | |
async def test_wire_up_two_protos(lproto, rproto): | |
pass | |
async def test_push_one_way(lproto, rproto): | |
await lproto.push(b"a") | |
assert await rproto.stream.receive_some(1) == b"a" | |
async def test_push_two_ways(lproto, rproto): | |
await lproto.push(b"a") | |
assert await rproto.stream.receive_some(1) == b"a" | |
await rproto.push(b"b") | |
assert await lproto.stream.receive_some(1) == b"b" | |
async def test_handles_lproto_broken_resource(lproto, rproto): | |
async def lproto_send_task(): | |
while True: | |
await lproto.push(b"a") | |
async def rproto_recv_task(): | |
while True: | |
if not await rproto.pull(1): | |
return | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(lproto_send_task) | |
nursery.start_soon(rproto_recv_task) | |
await rproto.stream.aclose() | |
async def test_handles_lproto_closed_resource(lproto, rproto): | |
async def lproto_send_task(): | |
while True: | |
await lproto.push(b"a") | |
async def rproto_recv_task(): | |
while True: | |
if not await rproto.pull(1): | |
return | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(lproto_send_task) | |
nursery.start_soon(rproto_recv_task) | |
await lproto.stream.aclose() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import asynccontextmanager | |
import attr | |
from trio import BrokenResourceError, ClosedResourceError, Nursery | |
from trio.abc import Stream | |
@attr.s(auto_attribs=True) | |
class Protocol: | |
id: int | |
stream: Stream | |
nursery: Nursery | |
async def push(self, message): | |
try: | |
await self.stream.send_all(message) | |
except (BrokenResourceError, ClosedResourceError) as exc: | |
print(f"push: {type(exc)}: {exc}") | |
self.nursery.cancel_scope.cancel() | |
async def pull(self, length): | |
try: | |
return await self.stream.receive_some(length) | |
except (BrokenResourceError, ClosedResourceError) as exc: | |
print(f"pull: {type(exc)}: {exc}") | |
self.nursery.cancel_scope.cancel() | |
@asynccontextmanager | |
async def open_protocol(id, stream, nursery): | |
try: | |
protocol = Protocol(id, stream, nursery) | |
yield protocol | |
finally: | |
await protocol.stream.aclose() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment