Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
vxgmichel / trio_task_monitoring.py
Last active August 28, 2023 18:22
Trio instrument to detect and log blocking tasks
import time
import inspect
import traceback
import trio
import structlog
logger = structlog.get_logger()
@vxgmichel
vxgmichel / continuedfraction.py
Created May 26, 2019 09:02
Python helpers for continued fractions
"""Helpers for continued fractions"""
import math
import itertools
def continued_fraction(n, d):
while d:
q, r = divmod(n, d)
n, d = d, r
@vxgmichel
vxgmichel / telnet-python.py
Last active October 30, 2020 09:23
Serving ptpython using telnet
#!/usr/bin/env python
"""
Serving ptpython using telnet
I ran into several issues when writing this piece of code, both with ptpython
and prompt_toolkit. Find below a short description for each of them.
Ptpython issues:
----------------
@vxgmichel
vxgmichel / tasktiming.py
Created April 3, 2019 12:07
Patch asyncio handles to measure task runtime
import time
import asyncio
def task_timing_patch():
class TaskTimingHandle(asyncio.events.Handle):
def _add_runtime(self, value):
try:
@vxgmichel
vxgmichel / asyncio_timing.py
Last active November 16, 2022 05:35
A timing context for asyncio
# See the coresponding stackoverflow post:
# https://stackoverflow.com/a/34827291/2846140
import time
import asyncio
import selectors
import contextlib
class TimedSelector(selectors.DefaultSelector):
@vxgmichel
vxgmichel / multifuse.py
Created March 12, 2019 18:01
Demonstrate signal issues in a fusepy application with multiple instances
import os
import time
import pathlib
from concurrent.futures import ThreadPoolExecutor
from fuse import FUSE, Operations
with ThreadPoolExecutor() as executor:
mountpoint1 = "./a"
@vxgmichel
vxgmichel / turing.py
Last active March 2, 2019 14:00
A turing machine runner implemented with string substitution
def generate_tape(tape, state, offset=1):
head, *tail = tape
*_, last = tape
return (
f"... {head} " +
f" {head} " * (offset - 1) +
f" [{head}|{state}]" +
"".join(f" {symbol} " for symbol in tail) +
f" {last} " * (offset - 1) +
@vxgmichel
vxgmichel / sshmockserver.py
Last active August 31, 2023 14:56
A pytest fixture for running an ssh mock server
"""A pytest fixture for running an ssh mock server.
Requires pytest and asyncssh:
$ pip install pytest asyncssh
"""
from socket import AF_INET
from unittest.mock import Mock
from contextlib import asynccontextmanager
@vxgmichel
vxgmichel / debug_fd.py
Created February 25, 2019 18:49
A pytest fixture for detecting leaking file descriptors
import os
import pytest
import subprocess
def get_open_fds():
cmd = f"lsof -p {os.getpid()}"
result = subprocess.run(cmd.split(), capture_output=True)
return {line.split()[-1] for line in result.stdout.splitlines()}
@vxgmichel
vxgmichel / taskstatus.py
Last active January 29, 2019 11:54
Helper to wrap and monitor a trio task
import attr
import trio
@attr.s
class TaskStatus:
# Internal state
_cancel_scope = attr.ib(default=None)