Created
August 3, 2021 13:09
-
-
Save ktbarrett/0771655ff28c174b198d38168f72b8b3 to your computer and use it in GitHub Desktop.
Generic channel type for cocotb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asyncio import QueueEmpty, QueueFull | |
from typing import Deque, Generic, TypeVar | |
from cocotb.triggers import Event | |
T = TypeVar("T") | |
class SendFailed(QueueFull): | |
... | |
class RecvFailed(QueueEmpty): | |
... | |
class Channel(Generic[T]): | |
def __init__(self, maxlen: int = 0) -> None: | |
self._maxlen = maxlen | |
self._queue = Deque[T]() | |
self._send_event = Event() | |
self._recv_event = Event() | |
@property | |
def maxlen(self) -> int: | |
return self._maxlen | |
def _send(self, value: T) -> None: | |
self._queue.append(value) | |
def _recv(self) -> T: | |
return self._queue.popleft() | |
def _peek(self) -> T: | |
return self._queue[0] | |
def send_nowait(self, value: T) -> None: | |
if not self.send_is_available(): | |
raise SendFailed | |
self._send(value) | |
self._send_event.set() | |
async def send(self, value: T) -> None: | |
await self.send_available() | |
self._send(value) | |
self._send_event.set() | |
def send_is_available(self) -> bool: | |
if self.maxlen <= 0: | |
return True | |
return len(self._queue) < self.maxlen | |
async def send_available(self) -> None: | |
while not self.send_is_available(): | |
self._recv_event.clear() | |
await self._recv_event.wait() | |
def recv_nowait(self) -> T: | |
if not self.recv_is_available(): | |
raise RecvFailed | |
self._recv_event.set() | |
return self._recv() | |
async def recv(self) -> T: | |
await self.recv_available() | |
self._recv_event.set() | |
return self._recv() | |
def peek_nowait(self) -> T: | |
if not self.recv_is_available(): | |
raise RecvFailed | |
return self._peek() | |
async def peek(self) -> T: | |
await self.recv_available() | |
return self._peek() | |
def recv_is_available(self) -> bool: | |
return len(self._queue) > 0 | |
async def recv_available(self) -> None: | |
while not self.recv_is_available(): | |
self._send_event.clear() | |
await self._send_event.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment