Skip to content

Instantly share code, notes, and snippets.

class UniversalQueue:
def __init__(self, *args, **kwargs):
self._queue = trio.Queue(*args, **kwargs)
self._portal = trio.BlockingTrioPortal()
async def trio_get(self):
return await self._queue.trio_get()
def thread_get(self):
return self._portal.run(self._queue.trio_get)
# Prototype version of the ideas described here:
# https://github.com/python-trio/trio/issues/147#issuecomment-321420110
from contextlib import contextmanager
import trio
class GracefulShutdownManager:
def __init__(self):
self._shutting_down = False
self._cancel_scopes = set()
import types
class FakeFuture:
_asyncio_future_blocking = True
@property
def _loop(self):
import asyncio
return asyncio.get_event_loop()
# Really need to factor out a bunch of this into reusable logic
# That's what I eventually want to do with the sansio_toolbelt library
# But for now, this may be handy...
async def receive_lines(rstream, *, receive_size=16384, max_line_size=16384):
buf = bytearray()
searched = 0
while True:
found = buf.find(b"\n", searched)
import trio
URLS = [...]
async def fetch_into_queue(url, queue):
result = await s.get(url)
await queue.put(result)
async def fetch_all_and_display_as_ready():
queue = trio.Queue()
# Ideally, we would manage async access to stdin/stdout/stderr *without*
# setting them to non-blocking mode, because that can break other processes.
# (See https://github.com/python-trio/trio/issues/174 for much more detail.)
# Of course we can call read/write in a separate thread, but then we lose
# cancellation support.
# This file demonstrates a weird hack to make blocking read/write cancellable,
# and thus at least theoretically possible to integrate into Trio as ordinary
# first-class operations.
import trio
import struct
import traceback
################################################################
# This part is a helper for reading N bytes from a stream
################################################################
class UnexpectedEOFError(Exception):
pass
import trio
import asyncio
class AsyncioTrioPortal:
def __init__(self, *, asyncio_loop, trio_token):
self._asyncio_loop = asyncio_loop
self._trio_token = trio_token
async def run_sync_asyncio_from_trio(self, func, *args):
"""Runs the given *synchronous* function in asyncio, from trio"""
import trio
async def crash_one():
raise ValueError("crash_one")
async def crash_two():
try:
raise KeyError("context exception")
except KeyError:
raise ValueError("crash_two")
async def run_all(*async_fns):
results = [None] * len(async_fns)
async def run_one(i, async_fn):
results[i] = await async_fn()
async with trio.open_nursery() as nursery:
for i, async_fn in enumerate(async_fns):
nursery.start_soon(run_one, i, async_fn)