Skip to content

Instantly share code, notes, and snippets.

import anyio.streams.buffered
import anyio.abc
import socket
import functools
async def pipe(reader, writer):
raw_socket = reader.extra_attributes[anyio.abc.SocketAttribute.raw_socket]()
try:
while True:
from __future__ import annotations
import dataclasses
import math
from collections.abc import Callable, Coroutine, Generator
from typing import TYPE_CHECKING
import trio.lowlevel
from typing_extensions import ParamSpec, Self, TypeVar, overload
WARNING: This command is only meant for debugging. Do not use this with automation for parsing and getting these details, since the output and options of this command may change without notice.
pip version: pip 24.2 from C:\hostedtoolcache\windows\PyPy\3.10.14\x86\lib\site-packages\pip (python 3.10)
sys.version: 3.10.14 (39dc8d3c85a7, Aug 27 2024, 14:33:33)
[PyPy 7.3.17 with MSC v.1929 64 bit (AMD64)]
sys.executable: C:\hostedtoolcache\windows\PyPy\3.10.14\x86\python.exe
sys.getdefaultencoding: utf-8
sys.getfilesystemencoding: utf-8
locale.getpreferredencoding: UTF-8
sys.platform: win32
sys.implementation:
[run]
branch = True
parallel = True
concurrency = multiprocessing,thread
source_pkgs = demo
import pathlib
import sys
import tomllib
from collections.abc import Iterable
def main() -> None:
project = pathlib.Path("pyproject.toml")
module = tomllib.loads(project.read_text())["tool"]["mypy"]["overrides"][0][
"module"
import asyncio
import contextlib
async def child(sock):
try:
with contextlib.closing(sock):
await asyncio.sleep(2)
except Exception as e:
print(f"child task got error: {type(e)=} {e=}")
raise
import objgraph
import asyncio
READY = object()
import weakref
import asyncio
import gc
import logging
@graingert
graingert / demo.py
Created July 31, 2022 09:50
running trio in asyncio and getting the current task context
import asyncio
import collections.abc
import contextlib
import functools
import types
import httpx
import sniffio
import trio
import asyncio
import collections.abc
import contextlib
import functools
import types
import httpx
import sniffio
import trio
from twisted.internet import defer
import curio
import concurrent
class Interceptor:
def __init__(self, send, coro):
self._send = send
self._coro = coro