Last active
July 30, 2018 18:48
-
-
Save vodik/8c4ad19e835e2c282bca7ae3d0a29309 to your computer and use it in GitHub Desktop.
New observable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
from collections.abc import AsyncGenerator | |
class Subject(AsyncGenerator): | |
def __init__(self, *, loop=None): | |
if loop is None: | |
self._loop = asyncio.get_event_loop() | |
else: | |
self._loop = loop | |
self._push = self._loop.create_future() | |
self._pull = self._loop.create_future() | |
self._awaiters = [] | |
self._busy = False | |
async def asend(self, value): | |
await self._serialize_access() | |
self._push.set_result(value) | |
await self._wait_for_pull() | |
async def athrow(self, typ, val=None, tb=None): | |
await self._serialize_access() | |
self._push.set_exception(val or typ()) | |
await self._wait_for_pull() | |
async def aclose(self): | |
await self.athrow(StopAsyncIteration) | |
async def _wait_for_pull(self): | |
await self._pull | |
self._pull = self._loop.create_future() | |
self._busy = False | |
async def _serialize_access(self): | |
while self._busy: | |
future = self._loop.create_future() | |
self._awaiters.append(future) | |
await future | |
self._awaiters.remove(future) | |
self._busy = True | |
async def __aiter__(self): | |
while True: | |
try: | |
yield await self._push | |
except StopAsyncIteration: | |
return | |
finally: | |
self._push = self._loop.create_future() | |
self._pull.set_result(True) | |
for awaiter in self._awaiters[:1]: | |
awaiter.set_result(True) | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, typ, val, tb): | |
if not typ: | |
await self.aclose() | |
else: | |
await self.athrow(typ, val, tb) | |
async def main(): | |
agen = Subject() | |
async def read_msgs(subject): | |
try: | |
async for msg in subject: | |
print(f"Received: {msg}") | |
await asyncio.sleep(2) | |
except RuntimeError as err: | |
print(f"Source has crashed: {err}") | |
async def send_msg(subject, value): | |
print(f"Sending value: {value}") | |
await subject.asend(value) | |
async def send_msgs(subject): | |
async with subject: | |
await send_msg(subject, 1) | |
await send_msg(subject, 2) | |
await send_msg(subject, 3) | |
raise RuntimeError("Example athrow of exception") | |
await asyncio.gather(read_msgs(agen), send_msgs(agen)) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment