This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TypedAttribute(Generic[T_Attr]): | |
""" | |
Generic class used to define typed attributes, for use in :class:`~TypedAttributeContainer`. | |
""" | |
class TypedAttributeContainer(metaclass=ABCMeta): | |
""" | |
Base class for classes that wish to provide typed extra attributes. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socket | |
class DatagramProtocol(asyncio.DatagramProtocol): | |
def error_received(self, exc: Exception) -> None: | |
print('received error:', exc.__class__, ':', exc) | |
async def main(): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import curio | |
async def main(): | |
async def cancel(): | |
await curio.sleep(0.1) | |
await task.cancel(blocking=False) | |
event = curio.Event() | |
task = await curio.current_task() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import socket | |
from dataclasses import dataclass, field | |
from email.headerregistry import Address | |
from email.message import EmailMessage | |
from email.utils import getaddresses, parseaddr | |
from ssl import SSLContext | |
from typing import Optional, Iterable, Callable, Union, List, Dict, Any | |
from anyio import connect_tcp, fail_after, BlockingPortal, start_blocking_portal |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pickle | |
import platform | |
import threading | |
from datetime import datetime, timezone, timedelta | |
from typing import Optional, Dict, AsyncGenerator, List, Any, Union, AsyncIterable | |
import anyio | |
import sniffio | |
from asyncpg import create_pool, Connection |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socket | |
from contextlib import suppress | |
from pprint import pprint | |
async def main(): | |
async def send_loop(): | |
payload = b'\x00' * 1024_000 | |
while True: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import socket | |
class CustomProto(asyncio.DatagramProtocol): | |
def connection_made(self, transport): | |
print('ready to receive datagrams on', transport.get_extra_info('sockname')) | |
def error_received(self, exc): | |
print('Error:', exc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import socket | |
from threading import Thread | |
def serve(): | |
global accepted_client | |
accepted_client = server.accept() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import abstractmethod, ABCMeta | |
from typing import Generic, TypeVar, Union | |
T_Item = TypeVar('T_Item', covariant=True) | |
class AsyncResource(metaclass=ABCMeta): | |
@abstractmethod | |
async def aclose(self) -> None: | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread, Event | |
import pytest | |
from tornado import web, ioloop | |
from dummyserver.handlers import TestingApp | |
from dummyserver.server import DEFAULT_CERTS, run_tornado_app | |
try: | |
import asyncio |