Skip to content

Instantly share code, notes, and snippets.

View thehesiod's full-sized avatar
🎯
Focusing

Alexander Mohr thehesiod

🎯
Focusing
View GitHub Profile
@thehesiod
thehesiod / async_exit_stack.py
Last active July 23, 2017 19:35
Async ExitStack
from inspect import iscoroutinefunction, isawaitable
import sys
from collections import deque
# NOTE: this follows the contextlib.ExitStack implementation
class _BaseExitStack:
def __init__(self):
@thehesiod
thehesiod / async_worker_pool.py
Last active September 26, 2024 03:14
Asynchronous Worker Pool, allows for limiting number of concurrent tasks
import asyncio
from datetime import datetime, timezone
import os
def utc_now():
# utcnow returns a naive datetime, so we have to set the timezone manually <sigh>
return datetime.utcnow().replace(tzinfo=timezone.utc)
class Terminator:
pass
@thehesiod
thehesiod / request_https_leak.py
Last active March 21, 2017 19:59
Python Requests HTTPS leak
import requests
import multiprocessing
import setproctitle
import os
import threading
import tracemalloc
import linecache
import traceback
import gc
import logging
@thehesiod
thehesiod / libzmq_bug.cpp
Last active May 14, 2017 18:22
libzmq bug
#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include "zmq.h"
#include <stdio.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdexcept>
#include <time.h>
#include <string>
#!/usr/bin/env python3.5
import zmq.asyncio
import zmq
import asyncio
import pickle
import logging
import tempfile
import traceback
import multiprocessing
@thehesiod
thehesiod / ahps_formats.py
Last active June 6, 2017 22:49
Comparison of new and old AHPS file formats
import numpy as np
import netCDF4
def reversed_list_enumerate(value: list):
for i in range(-1, -len(value) - 1, -1):
yield i, value[i]
def find_first(dataset, var_name, reverse: bool, ahps_to_inches: bool=False):
@thehesiod
thehesiod / aiohttp_timeout.py
Created June 7, 2017 22:04
asyncio timeout testcase
import asyncio
import aiohttp.web
async def stream_handler(request):
# Without the Content-Type, most (all?) browsers will not render
# partially downloaded content. Note, the response type is
# StreamResponse not Response.
resp = aiohttp.web.StreamResponse(status=200,
reason='OK',
headers={'Content-Type': 'text/html'})
@thehesiod
thehesiod / aio_rate_limiter.py
Last active September 14, 2018 07:17
asyncio sliding window rate limiter
import asynctest
import asyncio
import logging
from collections import deque
class RateLimiter:
class Error(Exception):
pass
@thehesiod
thehesiod / xray_httplib_patch.py
Last active August 11, 2017 19:15
AWS X-Ray http.client patch
import wrapt
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.util import inject_trace_header
import ssl
_XRAY_PROP = '_xray_prop'
@thehesiod
thehesiod / moto_service.py
Last active October 27, 2020 07:07
Moto Service Helper Class
import asyncio
import functools
import logging
import os
import threading
import socket
import http.server
from typing import Dict, Any, Optional
# Third Party