Skip to content

Instantly share code, notes, and snippets.

@graingert
graingert / demo.py
Created July 31, 2022 09:50
running trio in asyncio and getting the current task context
import asyncio
import collections.abc
import contextlib
import functools
import types
import httpx
import sniffio
import trio
import asyncio
import collections.abc
import contextlib
import functools
import types
import httpx
import sniffio
import trio
from twisted.internet import defer
import curio
import concurrent
class Interceptor:
def __init__(self, send, coro):
self._send = send
self._coro = coro
__all__ = 'run',
import functools
import asyncio
from asyncio import coroutines
from asyncio import events
from asyncio import tasks
import contextlib
@contextlib.contextmanager
@graingert
graingert / .gitignore
Last active July 28, 2022 10:11
twisted asyncio tests
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
import asyncio
import contextlib
class _AcknowledgeException:
def __init__(self, exception):
self.exception = exception
class _RecommendGeneratorExit(BaseException):
@graingert
graingert / agen_close.py
Last active July 22, 2022 10:20
closing an async_generator_asend does not throw GeneratorExit into the `await` statement
import types
def underline(text):
last_line = text.splitlines()[-1]
return text + "\n" + "^" * len(last_line)
@types.coroutine
def _async_yield(v):
return (yield v)
import asyncio
import async_timeout
import quattro
import contextlib
import anyio
async def main(note, cmgr):
try:
async with cmgr(-11):
import asyncio
async def main():
try:
async with asyncio.timeout(0):
await asyncio.sleep(0)
except TimeoutError:
print("PASS: great a timeout is here!")
else:
print("FAILED: expected a timeout")
import psutil
import multiprocessing
import socket
import sys
import concurrent.futures
ctx = multiprocessing.get_context("spawn")
def target(sock):