Last active
March 9, 2022 17:41
-
-
Save ktbarrett/1730d05a297d8dc5a394aee11fd48378 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from typing import Deque, Generic, Optional, TypeVar | |
from cocotb.triggers import Event | |
T = TypeVar("T") | |
class RecvFailed(Exception): | |
... | |
class SendFailed(Exception): | |
... | |
class Channel(Generic[T]): | |
def __init__(self, maxsize: Optional[int] = None) -> None: | |
if isinstance(maxsize, int) and maxsize < 0: | |
raise ValueError("maxsize cannot be less than 0") | |
if maxsize == 0: | |
# fully synchronous case | |
raise NotImplementedError("maxsize == 0 case is not supported yet") | |
self._queue: Deque[T] = deque() | |
self._maxsize = maxsize | |
self._send_event = Event() | |
self._recv_event = Event() | |
@property | |
def maxsize(self) -> Optional[int]: | |
return self._maxsize | |
def _send(self, __value: T) -> None: | |
self._queue.append(__value) | |
self._send_event.set() | |
def send_nowait(self, __value: T) -> None: | |
if not self.send_available(): | |
raise SendFailed | |
else: | |
self._send(__value) | |
async def send(self, __value: T) -> None: | |
await self.until_send_available() | |
self._send(__value) | |
def send_available(self) -> bool: | |
return self._maxsize is None or len(self._queue) < self._maxsize | |
async def until_send_available(self) -> None: | |
while not self.send_available(): | |
self._recv_event.clear() | |
await self._recv_event.wait() | |
def _recv(self) -> T: | |
self._recv_event.set() | |
return self._queue.popleft() | |
def recv_nowait(self) -> T: | |
if not self.recv_available(): | |
raise RecvFailed | |
else: | |
return self._recv() | |
async def recv(self) -> T: | |
await self.until_recv_available() | |
return self._recv() | |
def recv_available(self) -> bool: | |
return len(self._queue) > 0 | |
async def until_recv_available(self) -> None: | |
while not self.recv_available(): | |
self._send_event.clear() | |
await self._send_event.wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment