Skip to content

Instantly share code, notes, and snippets.

View ddelange's full-sized avatar
💥
["translatio", "imitatio", "aemulatio"]

ddelange ddelange

💥
["translatio", "imitatio", "aemulatio"]
View GitHub Profile
@ddelange
ddelange / parse_duration_string.py
Last active November 15, 2024 15:20
Parse a GEP-2257 like duration string into a Python timedelta
import re
from datetime import timedelta
# Ordered case-insensitive duration strings, loosely based on the [GEP-2257](https://gateway-api.sigs.k8s.io/geps/gep-2257/) spec
# Discrepancies between this pattern and GEP-2257 duration strings:
# - this pattern accepts units `w|d|h|m|s|ms|[uµ]s` (all units supported by the datetime.timedelta constructor), GEP-2257 accepts only `h|m|s|ms`
# - this pattern allows for optional whitespace around the units, GEP-2257 does not
# - this pattern is compiled to be case-insensitive, GEP-2257 expects lowercase units
# - this pattern expects ordered (descending) units, GEP-2257 allows arbitrary order
# - this pattern does not allow duplicate unit occurrences, GEP-2257 does
@ddelange
ddelange / itunes_xml_to_m3u.py
Last active November 8, 2024 17:24
Convert iTunes Music Library xml to m3u8 playlists
# pip install click
# python itunes_xml_to_m3u.py --help
import logging
import plistlib
import re
import typing as tp
from pathlib import Path
from urllib.parse import unquote
import click
@ddelange
ddelange / ensure_avc_mp4.sh
Created April 25, 2024 09:34
Conditionally copy or encode video stream into avc (x264) mp4 with ffmpeg and ffprobe
#!/usr/bin/env bash
set -euxo pipefail
video_in="video_in.mkv"
video_out="video_out.mp4"
# full metadata payload: $(ffprobe -loglevel error -print_format json -show_format -show_streams "${video_in}")
is_avc=$(ffprobe -loglevel error -select_streams v:0 -show_entries stream=is_avc -of default=noprint_wrappers=1:nokey=1 "${video_in}")
@ddelange
ddelange / sharednumpyarray.py
Created March 25, 2024 11:38
Multiprocessing numpy using zero-copy SharedNumpyArray and ProcessPoolExecutor.imap
from collections import deque
from concurrent.futures import ProcessPoolExecutor as _ProcessPoolExecutor
from multiprocessing.shared_memory import SharedMemory
import numpy as np
class ProcessPoolExecutor(_ProcessPoolExecutor):
"""Subclass with a lazy consuming imap method."""
@ddelange
ddelange / build-git.sh
Last active February 15, 2024 01:51 — forked from ivan-c/build-git.sh
compile git with openssl instead of gnutls
#!/usr/bin/env bash
# original gist from pescobar/build-git.sh
# changes by LaggAt:
# * to be usable on Raspbian / tested RPi3 and
# * for automatic depency resolving
# changes by ivan-c:
# * add `apt-get update`
# changes by ddelange:
# * add `set -euxo pipefail`
# * remove `--allow-downgrades`
@ddelange
ddelange / stream_command.py
Last active July 14, 2024 21:09
Handling the live output stream of a command
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
@ddelange
ddelange / threadpoolexecutor.py
Last active September 24, 2024 10:28
The missing ThreadPoolExecutor.imap (also works for ProcessPoolExecutor.imap)
from collections import deque
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
class ThreadPoolExecutor(_ThreadPoolExecutor):
"""Subclass with a lazy consuming imap method."""
def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2):
"""Ordered imap that lazily consumes iterables ref https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999."""
futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
@ddelange
ddelange / URIDownloader.py
Last active October 11, 2024 14:17
Multithreaded S3 downloads
# pip install smart_open[s3]
from collections import deque
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
from functools import partial
from typing import Callable, Dict, Optional, Iterable, Iterator, Sequence
import boto3
import botocore
import smart_open
@ddelange
ddelange / release_assets_pip_find_links.py
Last active September 22, 2022 06:29
Create a PIP_FIND_LINKS page containing all release assets
@ddelange
ddelange / mp4_srt.py
Last active December 19, 2022 07:09
Mux multiple subtitle files into an mp4 file
# pip install sh pycountry
import re
from pathlib import Path
import pycountry
from sh import ffmpeg
def mux_mp4_subs(inp, outp, *subs, _cwd=None, **subs_map):
"""Mux multiple subtitle files into an mp4 file.