Last active
October 26, 2021 15:32
-
-
Save ktbarrett/32e7013a8b7327cb420cd9e949989cb4 to your computer and use it in GitHub Desktop.
Broadcasting channels with inline stream processing and events in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABCMeta, abstractmethod, abstractproperty | |
from asyncio import QueueEmpty | |
from collections import deque | |
from typing import ( | |
Callable, | |
Deque, | |
Generic, | |
Protocol, | |
Set, | |
Type, | |
TypeVar, | |
runtime_checkable, | |
) | |
from weakref import WeakSet | |
from cocotb.triggers import Alert | |
T = TypeVar("T") | |
S = TypeVar("S") | |
V = TypeVar("V") | |
@runtime_checkable | |
class Sink(Generic[T], Protocol): | |
@abstractmethod | |
def send(self, value: T) -> None: | |
... | |
@runtime_checkable | |
class Source(Generic[T], Protocol): | |
@abstractproperty | |
def event(self) -> Alert: | |
... | |
@abstractmethod | |
def recv_nowait(self) -> T: | |
... | |
@abstractmethod | |
async def becomes_available(self) -> None: | |
... | |
@abstractmethod | |
def is_available(self) -> bool: | |
... | |
async def recv(self) -> T: | |
await self.becomes_available() | |
return self.recv_nowait() | |
@runtime_checkable | |
class Input(Generic[T], Protocol): | |
def source(self) -> Source[T]: | |
... | |
def filter(self, func: Callable[[T], bool]) -> "Input[T]": | |
... | |
def map(self, func: Callable[[T], S]) -> "Input[S]": | |
... | |
def filter_type(self, *types: Type[S]) -> "Input[S]": | |
... | |
@runtime_checkable | |
class Output(Generic[T], Protocol): | |
def sink(self) -> Sink[T]: | |
... | |
class ChannelNode(Sink[T], Input[S], metaclass=ABCMeta): | |
def __init__(self, parent: object) -> None: | |
# parent ref is not used, but is required to prevent intermediate nodes | |
# in a system from being cleaned up by the parent's weakset | |
self._parent = parent | |
self._children: Set["Sink[S]"] = WeakSet() | |
@abstractmethod | |
def send(self, value: T) -> None: | |
... | |
def source(self) -> Source[S]: | |
src = ChannelSource[S](self) | |
self._children.add(src) | |
return src | |
def map(self, func: Callable[[S], V]) -> "Input[V]": | |
node = ChannelMapNode[S, V](self, func) | |
self._children.add(node) | |
return node | |
def filter(self, func: Callable[[S], bool]) -> "Input[S]": | |
node = ChannelFilterNode[S, S](self, func) | |
self._children.add(node) | |
return node | |
def filter_type(self, *types: Type[V]) -> "Input[V]": | |
node = ChannelFilterNode[S, V](self, lambda t: isinstance(t, types)) | |
self._children.add(node) | |
return node | |
class ChannelSource(Source[T]): | |
def __init__(self, parent: object) -> None: | |
# parent ref is not used, but is required to prevent intermediate nodes | |
# in a system from being cleaned up by the parent's weakset | |
self._parent = parent | |
self._queue: Deque[T] = deque() | |
self._event = Alert() | |
def send(self, value: T) -> None: | |
self._queue.append(value) | |
self._event.set() | |
async def recv(self) -> T: | |
await self.becomes_available() | |
return self.recv_nowait() | |
def recv_nowait(self) -> T: | |
if not self.is_available(): | |
raise QueueEmpty() | |
return self._queue.popleft() | |
async def becomes_available(self) -> None: | |
while not self.is_available(): | |
await self._event | |
def is_available(self) -> bool: | |
return len(self._queue) > 0 | |
@property | |
def event(self) -> Alert: | |
return self._event | |
class ChannelMapNode(ChannelNode[T, S]): | |
def __init__(self, parent: object, func: Callable[[T], S]) -> None: | |
super().__init__(parent) | |
self._func = func | |
def send(self, value: T) -> None: | |
value = self._func(value) | |
for c in self._children: | |
c.send(value) | |
class ChannelFilterNode(ChannelNode[T, T]): | |
def __init__(self, parent: object, func: Callable[[T], bool]) -> None: | |
super().__init__(parent) | |
self._func = func | |
def send(self, value: T) -> None: | |
if self._func(value): | |
for c in self._children: | |
c.send(value) | |
class Channel(Output[T], Sink[T], Input[T]): | |
def __init__(self): | |
self._sink_taken = False | |
def sink(self) -> Sink[T]: | |
if self._sink_taken: | |
raise RuntimeError("Only one sink is allowed per channel") | |
self._sink_taken = True | |
return self | |
def send(self, value: T) -> None: | |
for c in self._children: | |
c.send(value) | |
def source(self) -> Source[T]: | |
src = ChannelSource[T](self) | |
self._children.add(src) | |
return src | |
def map(self, func: Callable[[T], S]) -> "Input[S]": | |
node = ChannelMapNode[T, S](self, func) | |
self._children.add(node) | |
return node | |
def filter(self, func: Callable[[T], bool]) -> "Input[T]": | |
node = ChannelFilterNode[T, T](self, func) | |
self._children.add(node) | |
return node | |
def filter_type(self, *types: Type[S]) -> "Input[S]": | |
node = ChannelFilterNode[T, S](self, lambda t: isinstance(t, types)) | |
self._children.add(node) | |
return node |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment