Created
May 17, 2020 14:40
-
-
Save decentral1se/f12d4f8df7051c80aa5d15be615fdaec to your computer and use it in GitHub Desktop.
locking.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from string import ascii_letters | |
from typing import Dict | |
import attr | |
from trio import ClosedResourceError, Lock, open_nursery, run | |
from trio.abc import Stream | |
from trio.testing import memory_stream_pair | |
@attr.s(auto_attribs=True) | |
class Channel: | |
id: int | |
stream: Stream | |
@attr.s(auto_attribs=True) | |
class Protocol: | |
id: int | |
stream: Stream | |
lock: Lock | |
MESSAGE_LENGTH: int = 1 | |
TIMEOUT: int = 10 | |
channels: Dict[int, Channel] = attr.Factory(dict) | |
async def recv(self): | |
while True: | |
async with self.lock: | |
try: | |
resp = await self.stream.receive_some(self.MESSAGE_LENGTH) | |
print(f"Protocol {self.id} received {resp.decode()}") | |
if resp == b"Z": | |
return | |
except ClosedResourceError: | |
return | |
async def asend(self, message): | |
async with self.lock: | |
await self.stream.send_all(message) | |
async def left_side_protocol(stream): | |
async with open_nursery() as nursery: | |
protocol = Protocol(id=1, stream=stream, lock=Lock()) | |
nursery.start_soon(protocol.recv) | |
for letter in ascii_letters: | |
await protocol.asend(letter.encode()) | |
async def right_side_protocol(stream): | |
async with open_nursery() as nursery: | |
protocol = Protocol(id=2, stream=stream, lock=Lock()) | |
nursery.start_soon(protocol.recv) | |
for letter in ascii_letters: | |
await protocol.asend(letter.encode()) | |
async def main(): | |
left_side_stream, right_side_stream = memory_stream_pair() | |
async with open_nursery() as nursery: | |
nursery.start_soon(left_side_protocol, left_side_stream) | |
nursery.start_soon(right_side_protocol, right_side_stream) | |
run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment