Skip to content

Instantly share code, notes, and snippets.

@ktbarrett
Last active October 26, 2021 15:32
Show Gist options
  • Save ktbarrett/32e7013a8b7327cb420cd9e949989cb4 to your computer and use it in GitHub Desktop.
Save ktbarrett/32e7013a8b7327cb420cd9e949989cb4 to your computer and use it in GitHub Desktop.
Broadcasting channels with inline stream processing and events in Python
from abc import ABCMeta, abstractmethod, abstractproperty
from asyncio import QueueEmpty
from collections import deque
from typing import (
Callable,
Deque,
Generic,
Protocol,
Set,
Type,
TypeVar,
runtime_checkable,
)
from weakref import WeakSet
from cocotb.triggers import Alert
T = TypeVar("T")
S = TypeVar("S")
V = TypeVar("V")
@runtime_checkable
class Sink(Generic[T], Protocol):
@abstractmethod
def send(self, value: T) -> None:
...
@runtime_checkable
class Source(Generic[T], Protocol):
@abstractproperty
def event(self) -> Alert:
...
@abstractmethod
def recv_nowait(self) -> T:
...
@abstractmethod
async def becomes_available(self) -> None:
...
@abstractmethod
def is_available(self) -> bool:
...
async def recv(self) -> T:
await self.becomes_available()
return self.recv_nowait()
@runtime_checkable
class Input(Generic[T], Protocol):
def source(self) -> Source[T]:
...
def filter(self, func: Callable[[T], bool]) -> "Input[T]":
...
def map(self, func: Callable[[T], S]) -> "Input[S]":
...
def filter_type(self, *types: Type[S]) -> "Input[S]":
...
@runtime_checkable
class Output(Generic[T], Protocol):
def sink(self) -> Sink[T]:
...
class ChannelNode(Sink[T], Input[S], metaclass=ABCMeta):
def __init__(self, parent: object) -> None:
# parent ref is not used, but is required to prevent intermediate nodes
# in a system from being cleaned up by the parent's weakset
self._parent = parent
self._children: Set["Sink[S]"] = WeakSet()
@abstractmethod
def send(self, value: T) -> None:
...
def source(self) -> Source[S]:
src = ChannelSource[S](self)
self._children.add(src)
return src
def map(self, func: Callable[[S], V]) -> "Input[V]":
node = ChannelMapNode[S, V](self, func)
self._children.add(node)
return node
def filter(self, func: Callable[[S], bool]) -> "Input[S]":
node = ChannelFilterNode[S, S](self, func)
self._children.add(node)
return node
def filter_type(self, *types: Type[V]) -> "Input[V]":
node = ChannelFilterNode[S, V](self, lambda t: isinstance(t, types))
self._children.add(node)
return node
class ChannelSource(Source[T]):
def __init__(self, parent: object) -> None:
# parent ref is not used, but is required to prevent intermediate nodes
# in a system from being cleaned up by the parent's weakset
self._parent = parent
self._queue: Deque[T] = deque()
self._event = Alert()
def send(self, value: T) -> None:
self._queue.append(value)
self._event.set()
async def recv(self) -> T:
await self.becomes_available()
return self.recv_nowait()
def recv_nowait(self) -> T:
if not self.is_available():
raise QueueEmpty()
return self._queue.popleft()
async def becomes_available(self) -> None:
while not self.is_available():
await self._event
def is_available(self) -> bool:
return len(self._queue) > 0
@property
def event(self) -> Alert:
return self._event
class ChannelMapNode(ChannelNode[T, S]):
def __init__(self, parent: object, func: Callable[[T], S]) -> None:
super().__init__(parent)
self._func = func
def send(self, value: T) -> None:
value = self._func(value)
for c in self._children:
c.send(value)
class ChannelFilterNode(ChannelNode[T, T]):
def __init__(self, parent: object, func: Callable[[T], bool]) -> None:
super().__init__(parent)
self._func = func
def send(self, value: T) -> None:
if self._func(value):
for c in self._children:
c.send(value)
class Channel(Output[T], Sink[T], Input[T]):
def __init__(self):
self._sink_taken = False
def sink(self) -> Sink[T]:
if self._sink_taken:
raise RuntimeError("Only one sink is allowed per channel")
self._sink_taken = True
return self
def send(self, value: T) -> None:
for c in self._children:
c.send(value)
def source(self) -> Source[T]:
src = ChannelSource[T](self)
self._children.add(src)
return src
def map(self, func: Callable[[T], S]) -> "Input[S]":
node = ChannelMapNode[T, S](self, func)
self._children.add(node)
return node
def filter(self, func: Callable[[T], bool]) -> "Input[T]":
node = ChannelFilterNode[T, T](self, func)
self._children.add(node)
return node
def filter_type(self, *types: Type[S]) -> "Input[S]":
node = ChannelFilterNode[T, S](self, lambda t: isinstance(t, types))
self._children.add(node)
return node
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment