Skip to content

Instantly share code, notes, and snippets.

View agronholm's full-sized avatar

Alex Grönholm agronholm

  • NextDay Solutions Oy
  • Nurmijärvi, Finland
View GitHub Profile
@agronholm
agronholm / resources.py
Created August 25, 2020 20:36
Typed dynamic attributes for AnyIO
class TypedAttribute(Generic[T_Attr]):
"""
Generic class used to define typed attributes, for use in :class:`~TypedAttributeContainer`.
"""
class TypedAttributeContainer(metaclass=ABCMeta):
"""
Base class for classes that wish to provide typed extra attributes.
@agronholm
agronholm / udptest.py
Created July 29, 2020 12:19
Reproduction script for asyncio udp bug
import asyncio
import socket
class DatagramProtocol(asyncio.DatagramProtocol):
def error_received(self, exc: Exception) -> None:
print('received error:', exc.__class__, ':', exc)
async def main():
@agronholm
agronholm / curiotest.py
Created July 21, 2020 21:53
Demonstration of cancellation fail
import curio
async def main():
async def cancel():
await curio.sleep(0.1)
await task.cancel(blocking=False)
event = curio.Event()
task = await curio.current_task()
@agronholm
agronholm / client.py
Created July 2, 2020 13:49
AnyIO SMTP client
import logging
import socket
from dataclasses import dataclass, field
from email.headerregistry import Address
from email.message import EmailMessage
from email.utils import getaddresses, parseaddr
from ssl import SSLContext
from typing import Optional, Iterable, Callable, Union, List, Dict, Any
from anyio import connect_tcp, fail_after, BlockingPortal, start_blocking_portal
@agronholm
agronholm / postgresql.py
Created February 18, 2020 09:47
WIP PostgreSQL data store
import os
import pickle
import platform
import threading
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, AsyncGenerator, List, Any, Union, AsyncIterable
import anyio
import sniffio
from asyncpg import create_pool, Connection
import asyncio
import socket
from contextlib import suppress
from pprint import pprint
async def main():
async def send_loop():
payload = b'\x00' * 1024_000
while True:
@agronholm
agronholm / udpreceive.py
Created December 27, 2019 18:13
Simple UDP receive/send pair for IPv6 with the receive side using asyncio
import asyncio
import socket
class CustomProto(asyncio.DatagramProtocol):
def connection_made(self, transport):
print('ready to receive datagrams on', transport.get_extra_info('sockname'))
def error_received(self, exc):
print('Error:', exc)
import os
import socket
from threading import Thread
def serve():
global accepted_client
accepted_client = server.accept()
@agronholm
agronholm / streams.py
Created December 18, 2019 09:49
AnyIO Stream ABCs
from abc import abstractmethod, ABCMeta
from typing import Generic, TypeVar, Union
T_Item = TypeVar('T_Item', covariant=True)
class AsyncResource(metaclass=ABCMeta):
@abstractmethod
async def aclose(self) -> None:
"""
@agronholm
agronholm / conftest.py
Last active November 24, 2019 20:07
urllib3 async test setup
from threading import Thread, Event
import pytest
from tornado import web, ioloop
from dummyserver.handlers import TestingApp
from dummyserver.server import DEFAULT_CERTS, run_tornado_app
try:
import asyncio