Skip to content

Instantly share code, notes, and snippets.

#!/usr/bin/env python3
from distributed import default_client
import sys
import asyncio
import concurrent.futures
from concurrent import futures
import contextlib
from tornado.ioloop import IOLoop
import random
import time
@graingert
graingert / bench.py
Created May 20, 2022 15:52 — forked from ajdavis/bench.py
asyncio getaddrinfo benchmark
# Test BaseEventLoop.getaddrinfo performance, before and after my commit 39c135b
# Try a variety of getaddrinfo parameters and log the duration of 10k calls
import socket
import time
from asyncio import get_event_loop
try:
from asyncio.base_events import _ipaddr_info
except ImportError:
import dataclasses
import operator
@dataclasses.dataclass
class WarningFilter:
command: str
msg: str
cls: str
module: str | None = None
import sys
import logging
import threading
import collections
class Record:
def __init__(self):
self.levelno = 0
from __future__ import annotations
import sys
import asyncio
import concurrent.futures
import contextlib
class _Client:
def __init__(self, loop):
@graingert
graingert / client.py
Created September 3, 2021 08:04
websocket client
#!/usr/bin/env python3
import sys
import asyncio
import json
import websockets
async def hello():
uri = "ws://localhost:8888"
@graingert
graingert / anyio.pytb
Created August 24, 2021 08:16
amirouche-generator-stop
2021-08-24 09:11:54.057 | ERROR | __main__:get:51 - http get failed with exception url: https://hyper.dev, <class 'RuntimeError'>
Traceback (most recent call last):
File "/home/graingert/.virtualenvs/testingpypy37/site-packages/httpcore/_backends/anyio.py", line 60, in read
return await self.stream.receive(n)
│ │ │ └ 65536
│ │ └ <function TLSStream.receive at 0x00007f0102d79920>
│ └ TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x00007f010434cb80>, standard_compatible=False, _...
└ <httpcore._backends.anyio.SocketStream object at 0x00005651a1184608>
@graingert
graingert / gimcrack.py
Created August 8, 2021 09:27
gimcrack.py
import curio
import collections
import threading
class Locals(threading.local):
synchrotron = None
tlocals = Locals()
_i = iter(EXPR)
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
else:
while 1:
try:
_s = yield _y
except GeneratorExit as _e:
@graingert
graingert / main.py
Created May 11, 2021 12:56
call Twisted from django
from __future__ import annotations
# with thanks to https://adamj.eu/tech/2020/10/15/a-single-file-rest-api-in-django/
import os
import sys
from dataclasses import dataclass
import treq
import asyncio
import asgiref.sync