Skip to content

Instantly share code, notes, and snippets.

# A modest proposal for making libraries like urllib3 async-friendly.
# Step 1: we need some way abstract over different network APIs. We
# encapsulate that into a class providing the basic networking operations we
# need. Two unusual but crucial points:
# - these methods are all semantically blocking
# - they're all async-colored (but wait until the end before judging this)
import abc
class AbstractNetworkIO(abc.ABC):
import trio
class PromiseLikeThing:
def __init__(self):
self._result = None
self._event = trio.Event()
async def get(self):
await self._event.wait()
return self._result.unwrap()
import ast
import types
import ctypes
# Really what we want, I think are compile modes "async_exec", "async_single",
# "async_eval". These would:
#
# - turn on await
# - turn on the coroutine flags, so that passing the code to exec() returns a
# coroutine object
PARALLEL = False
COUNT = 1000000000
import threading
import time
def count():
n = COUNT
while n > 0:
n -= 1
@njsmith
njsmith / wtf.py
Last active November 8, 2025 14:25
async def f():
return ValueError()
async def g():
try:
raise KeyError
except:
value_error = await f()
# This prints: KeyError() in CPython 3.5, 3.6, current master
# Apparently returning an exception object from an async function triggers
import sys
def dump_stack(where):
print("-- {} --".format(where))
frame = sys._getframe(1)
while frame:
print(frame.f_code.co_name)
frame = frame.f_back
def f():
import sys
import trio
def dump_stack(where):
print("-- {} --".format(where))
frame = sys._getframe(1)
while frame:
print(frame.f_code.co_name)
frame = frame.f_back
import curio
# A simple task supervisor. Very loosely inspired by Erlang's supervisor
# trees:
#
# http://erlang.org/doc/design_principles/sup_princ.html
# http://erlang.org/doc/man/supervisor.html
#
# ...but much simpler (no respawning, no one-for-one policy, etc.).
#
import os
# Results from
# pyperf timeit -s "mk = masking_keys(blocksize=N)" "for i in range(10000): next(mk)"
# (The for loop adds some overhead, but it's the same for the different
# variants, and makes sure that we're amortizing the per-block costs
# appropriately.)
#
# N= 10: 3.64 ms +- 0.04 ms
# N= 50: 2.74 ms +- 0.04 ms
from gevent.server import StreamServer
import time
last_addr = None
last_switch_loops = 0
last_switch_time = None
def pause(seconds):
deadline = time.time() + seconds