Skip to content

Instantly share code, notes, and snippets.

from twisted.internet import defer
import curio
import concurrent
class Interceptor:
def __init__(self, send, coro):
self._send = send
self._coro = coro
__all__ = 'run',
import functools
import asyncio
from asyncio import coroutines
from asyncio import events
from asyncio import tasks
import contextlib
@contextlib.contextmanager
@graingert
graingert / .gitignore
Last active July 28, 2022 10:11
twisted asyncio tests
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
import asyncio
import contextlib
class _AcknowledgeException:
def __init__(self, exception):
self.exception = exception
class _RecommendGeneratorExit(BaseException):
@graingert
graingert / agen_close.py
Last active July 22, 2022 10:20
closing an async_generator_asend does not throw GeneratorExit into the `await` statement
import types
def underline(text):
last_line = text.splitlines()[-1]
return text + "\n" + "^" * len(last_line)
@types.coroutine
def _async_yield(v):
return (yield v)
import asyncio
import async_timeout
import quattro
import contextlib
import anyio
async def main(note, cmgr):
try:
async with cmgr(-11):
import asyncio
async def main():
try:
async with asyncio.timeout(0):
await asyncio.sleep(0)
except TimeoutError:
print("PASS: great a timeout is here!")
else:
print("FAILED: expected a timeout")
import psutil
import multiprocessing
import socket
import sys
import concurrent.futures
ctx = multiprocessing.get_context("spawn")
def target(sock):
import timeit, random
import heapq
random.seed(42)
left = []
right = []
# Create two sorted lists of integers
n = 0
import asyncio
import ipaddress
import pprint
import socket
import sys
from socket import AddressFamily, SocketKind
from jeepney.wrappers import MessageGenerator, new_method_call