Skip to content

Instantly share code, notes, and snippets.

@agronholm
Created December 18, 2019 09:49
Show Gist options
  • Save agronholm/4f3a4ba95ba09663a7d7c73466b7f2cf to your computer and use it in GitHub Desktop.
Save agronholm/4f3a4ba95ba09663a7d7c73466b7f2cf to your computer and use it in GitHub Desktop.
AnyIO Stream ABCs
from abc import abstractmethod, ABCMeta
from typing import Generic, TypeVar, Union
T_Item = TypeVar('T_Item', covariant=True)
class AsyncResource(metaclass=ABCMeta):
@abstractmethod
async def aclose(self) -> None:
"""
Close the stream.
This method may cause some cleanup actions to be taken (e.g. the closing handshake of a
TLS connection). If this method is interrupted, it means that the resource will still be
closed, but uncleanly.
"""
class UnreliableReceiveMessageStream(Generic[T_Item], AsyncResource):
"""
A receive stream which does not guarantee that the received messages arrive in the order in
which they were sent (or at all).
"""
async def __aiter__(self):
return self
async def __anext__(self) -> T_Item:
return await self.receive()
@abstractmethod
async def receive(self) -> T_Item:
"""Receive the next item."""
class UnreliableSendMessageStream(Generic[T_Item], AsyncResource):
"""
A send stream which does not guarantee that the messages will arrive the peer(s) in the
order in which they are sent (or at all).
"""
@abstractmethod
async def send(self, item: T_Item) -> None:
"""
Send an item to the peer(s).
:param item: the item to send
"""
@abstractmethod
async def send_eof(self) -> None:
"""Signal the peer(s) that no more messages are going to be sent."""
async def aclose(self) -> None:
await self.send_eof()
await super().aclose()
class UnreliableMessageStream(UnreliableReceiveMessageStream, UnreliableSendMessageStream):
"""
A bidirectional message stream which does not guarantee the order or reliability of message
delivery.
"""
class ReceiveMessageStream(UnreliableReceiveMessageStream):
"""
A receive message stream which guarantees that messages are received in the same order in
which they were sent.
"""
class SendMessageStream(UnreliableSendMessageStream):
"""
A send message stream which guarantees that messages are delivered in the same order in which
they were sent, without missing any messages in the middle.
"""
class MessageStream(ReceiveMessageStream, SendMessageStream, UnreliableMessageStream):
"""
A bidirectional message stream which guarantees the order and reliability of message delivery.
"""
class ReceiveByteStream(AsyncResource):
@abstractmethod
async def receive(self, max_bytes: int = 2**16) -> bytes:
"""
Receive at most ``max_bytes`` bytes from the peer.
:param max_bytes:
:return: the received bytes
"""
class SendByteStream(AsyncResource):
@abstractmethod
async def send(self, item: bytes) -> None:
"""
Send the given bytes to the recipient(s).
:param item: the bytes to send
"""
@abstractmethod
async def send_eof(self) -> None:
"""Signal the peer(s) that no more bytes are going to be sent."""
async def aclose(self) -> None:
await self.send_eof()
await super().aclose()
class ByteStream(ReceiveByteStream, SendByteStream):
pass
AnyUnreliableReceiveByteStream = Union[UnreliableReceiveMessageStream[bytes], ReceiveByteStream]
AnyUnreliableSendByteStream = Union[UnreliableSendMessageStream[bytes], SendByteStream]
AnyUnreliableByteStream = Union[UnreliableMessageStream[bytes], ByteStream]
AnyReceiveByteStream = Union[ReceiveMessageStream[bytes], ReceiveByteStream]
AnySendByteStream = Union[SendMessageStream[bytes], SendByteStream]
AnyByteStream = Union[MessageStream[bytes], ByteStream]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment