Skip to content

Instantly share code, notes, and snippets.

@matanper
matanper / main.py
Created March 12, 2025 11:29
FastAPI sqlalchemy profiler
@app.middleware("http")
async def add_sql_tap(request: Request, call_next):
profiler = sqltap.start()
response = await call_next(request)
statistics = profiler.collect()
sqltap.report(statistics, "report.txt", report_format="text")
return response
@matanper
matanper / async_runner.py
Created February 2, 2025 09:19
Python3 Asyncio runner with grafecfull exit
import signal
import asyncio
from prometheus_client import start_http_server as start_prometheus_server
EXIT_EVENT = asyncio.Event()
async def infinite_run():
@matanper
matanper / prefixed_valkey.py
Created January 30, 2025 22:01
Prefixed Valkey/Redis client
import valkey
class PrefixedValkey(valkey.Valkey): # Use `ValkeyCluster` if needed
def __init__(self, *args, key_prefix: str, **kwargs):
super().__init__(*args, **kwargs)
self.key_prefix = key_prefix
def _prefix_key(self, key):
"""Prefix Redis keys with the worker-specific namespace."""
if isinstance(key, bytes): # Handle binary keys
@matanper
matanper / blocking_queries.sql
Created September 7, 2024 10:04
Useful PG commands
select pid,
usename,
pg_blocking_pids(pid) as blocked_by,
query as blocked_query
from pg_stat_activity
where cardinality(pg_blocking_pids(pid)) > 0;
@matanper
matanper / asyncio_utils.py
Last active July 17, 2024 20:15
Python asyncio utilities
import asyncio
import logging
from asyncio import AbstractEventLoop
from concurrent.futures import Future
from threading import Thread
from typing import Coroutine
from app.common.logger import logger
@matanper
matanper / DeduplicationKeySession.java
Created December 21, 2023 13:26
Apache Flink deduplication by session key
package org.monocle;
import java.io.IOException;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@matanper
matanper / parquet_to_tsdb.py
Last active September 5, 2024 18:57
Loading parquet files from S3 into timescaledb
import boto3
import os
import time
from subprocess import Popen, PIPE
import pyarrow.parquet as pq
import pickle
import hashlib
TSDB_CPUS = 8
@matanper
matanper / main.py
Created July 27, 2023 10:56
Python blocking sync function to async function in thread decorator
def to_asyncio_in_thread(func):
async def wrapper(*args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(None, lambda: func(*args, **kwargs))
return wrapper
# Usage:
@to_asyncio_in_thread
@matanper
matanper / cache.py
Created May 3, 2023 08:54
A thread-safe TTL cache function in python, and a thread-safe lock by parameter (hash key)
from cachetools import TTLCache
from cachetools.keys import hashkey
from parameterized_lock import parameterized_lock
def async_threadsafe_ttl_cache(func=None, ttl=60):
cache = TTLCache(maxsize=100, ttl=ttl)
def decorator(decorated_func):
@matanper
matanper / python_run_time_profiler.py
Created May 15, 2022 07:56
Python code snippet to run run-time profiling for python code
import cProfile
import io
import pstats
pr = cProfile.Profile()
pr.enable()
### Code to profile here
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('time')