Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
vxgmichel / debug_fd.py
Created February 25, 2019 18:49
A pytest fixture for detecting leaking file descriptors
import os
import pytest
import subprocess
def get_open_fds():
cmd = f"lsof -p {os.getpid()}"
result = subprocess.run(cmd.split(), capture_output=True)
return {line.split()[-1] for line in result.stdout.splitlines()}
@vxgmichel
vxgmichel / taskstatus.py
Last active January 29, 2019 11:54
Helper to wrap and monitor a trio task
import attr
import trio
@attr.s
class TaskStatus:
# Internal state
_cancel_scope = attr.ib(default=None)
@vxgmichel
vxgmichel / hotstream.py
Last active April 29, 2019 21:01
Hot stream implementation with aiostream
import asyncio
from aiostream import streamcontext
from aiostream.aiter_utils import anext
from aiostream.core import Streamer
async def cancel_and_join(task):
task.cancel()
try:
@vxgmichel
vxgmichel / bandit.sh
Created December 15, 2018 19:05
Solving Bandit from OverTheWire
#!/usr/bin/env bash
SERVER=bandit.labs.overthewire.org
PORT=2220
BASEUSER="bandit"
PASSWORD="bandit0"
COMMANDS[0]='cat readme'
COMMANDS[1]='cat ./-'
COMMANDS[2]='cat "spaces in this filename"'
@vxgmichel
vxgmichel / groupby.py
Last active October 13, 2021 15:04
Asynchronous implementation of itertools.groupby for aiostream
import asyncio
from aiostream.aiter_utils import anext
from aiostream import stream, operator, streamcontext, pipe
import pytest
@operator(pipable=True)
async def groupby(source, key=None):
@vxgmichel
vxgmichel / trio-asyncgen-writeup.md
Last active November 22, 2018 13:24
Using trio nurseries inside async generators

Using trio nurseries inside async generators

From what I've read in this comment (issue #264), it seems like the idea of:

  • allowing yield inside a trio nursery
  • AND allowing it to cancel the current task regardless of its current position in the code

is out of the picture. The main argument against that is presented in this comment (issue 638):

@vxgmichel
vxgmichel / merge.py
Last active November 16, 2018 10:14
Safely merging async generators with trio
import random
from contextlib import asynccontextmanager
import trio
@asynccontextmanager
async def aitercontext(resource):
aiterator = resource.__aiter__()
@vxgmichel
vxgmichel / merge.py
Last active November 15, 2018 13:42
Merging async generators with trio
import trio
import random
async def merge(agens):
async def produce(agen, channel):
async with channel:
async for item in agen:
await channel.send(item)
@vxgmichel
vxgmichel / foo.py
Created August 10, 2018 12:26
Answering on stack overflow - see the docstring
"""
A corrected version of the program in following stackoverflow question:
https://stackoverflow.com/questions/51775413/python-asyncio-run-coroutine-threadsafe-never-running-coroutine
"""
import asyncio
from threading import Thread
from contextlib import contextmanager
@vxgmichel
vxgmichel / compat.py
Created July 26, 2018 21:14
Asyncio/Curio/Trio compatibility module
import asyncio
try:
import curio
except ImportError:
curio = None
try:
import trio
except ImportError: