Skip to content

Instantly share code, notes, and snippets.

@ktbarrett
Created August 3, 2021 13:09
Show Gist options
  • Save ktbarrett/0771655ff28c174b198d38168f72b8b3 to your computer and use it in GitHub Desktop.
Save ktbarrett/0771655ff28c174b198d38168f72b8b3 to your computer and use it in GitHub Desktop.
Generic channel type for cocotb
from asyncio import QueueEmpty, QueueFull
from typing import Deque, Generic, TypeVar
from cocotb.triggers import Event
T = TypeVar("T")
class SendFailed(QueueFull):
...
class RecvFailed(QueueEmpty):
...
class Channel(Generic[T]):
def __init__(self, maxlen: int = 0) -> None:
self._maxlen = maxlen
self._queue = Deque[T]()
self._send_event = Event()
self._recv_event = Event()
@property
def maxlen(self) -> int:
return self._maxlen
def _send(self, value: T) -> None:
self._queue.append(value)
def _recv(self) -> T:
return self._queue.popleft()
def _peek(self) -> T:
return self._queue[0]
def send_nowait(self, value: T) -> None:
if not self.send_is_available():
raise SendFailed
self._send(value)
self._send_event.set()
async def send(self, value: T) -> None:
await self.send_available()
self._send(value)
self._send_event.set()
def send_is_available(self) -> bool:
if self.maxlen <= 0:
return True
return len(self._queue) < self.maxlen
async def send_available(self) -> None:
while not self.send_is_available():
self._recv_event.clear()
await self._recv_event.wait()
def recv_nowait(self) -> T:
if not self.recv_is_available():
raise RecvFailed
self._recv_event.set()
return self._recv()
async def recv(self) -> T:
await self.recv_available()
self._recv_event.set()
return self._recv()
def peek_nowait(self) -> T:
if not self.recv_is_available():
raise RecvFailed
return self._peek()
async def peek(self) -> T:
await self.recv_available()
return self._peek()
def recv_is_available(self) -> bool:
return len(self._queue) > 0
async def recv_available(self) -> None:
while not self.recv_is_available():
self._send_event.clear()
await self._send_event.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment