Skip to content

Instantly share code, notes, and snippets.

View beatzxbt's full-sized avatar
🎯
Focusing

beatzxbt

🎯
Focusing
View GitHub Profile
@beatzxbt
beatzxbt / ringbuffer.pyx
Created March 6, 2025 20:01
1d f64 ringbuffer.pyx
import numpy as np
cimport numpy as cnp
from libc.stdint cimport uint64_t, int64_t
cdef class RingBufferOneDim:
"""
A 1-dimensional fixed-size circular buffer for floats/doubles.
"""
@beatzxbt
beatzxbt / ratelimiter.pyx
Created March 2, 2025 16:00
basic rate limiter
from libc.stdint cimport uint32_t
from mm_toolbox.time.time cimport time_s
from .engine cimport OrderAction
cdef class RateLimitCounter:
cdef:
uint32_t tokens_per_sec
uint32_t tokens_remaining
@beatzxbt
beatzxbt / tools.pyx
Created December 31, 2024 21:57
uber fast list[list[str]] -> np.ndarray[np.double]
import numpy as np
cimport numpy as np
from libc.stdint cimport uint16_t
from libc.stdlib cimport strtod
from cpython.unicode cimport PyUnicode_AsUTF8
cpdef np.ndarray[np.double_t, ndim=2] parse_list_list_str_to_floats(list[list[str]] data):
"""
Parses a list of lists of strings into a NumPy array of floats.
import xxhash
import asyncio
import platform
from collections import deque
from warnings import warn as warning
from typing import List, Dict, Optional
from mm_toolbox.time import time_ns, time_s
from .conn.raw cimport WsConnection
@beatzxbt
beatzxbt / l2dataingresstools.pyx
Created September 24, 2024 11:30
high performance str -> float converter
import numpy as np
cimport numpy as np
from libc.stdlib cimport strtod
from cpython.unicode cimport PyUnicode_AsUTF8
from cython cimport boundscheck, wraparound
@boundscheck(False)
@wraparound(False)
cdef np.ndarray[np.float64_t, ndim=2] parse_list_strings_to_floats(list[list[str]] data):
"""
@beatzxbt
beatzxbt / wspool.py
Created August 15, 2024 08:43
fast ws pool impl
import orjson
import asyncio
from warnings import warn as warning
from dataclasses import dataclass
from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType
from typing import List, Dict, Callable, Any, Optional
from mm_toolbox.src.logging import Logger
from mm_toolbox.src.time import time_ms
from mm_toolbox.src.ringbuffer import RingBufferSingleDimFloat
def generate_new_buffer(size: float, strength: float) -> tuple[float, float]:
"""
Generates a new buffer range based on the size and strength.
"""
return (size * (1 - strength), size * (1 + strength))
def process_data(data) -> None:
"""
Processes the data with varying buffer strengths and prints the results.
"""
@beatzxbt
beatzxbt / bybit_orderbook_handler.py
Created May 28, 2024 11:12
orderbook handler w/zmq support
import zmq
import orjson
from time import time_ns
from abc import ABC, abstractmethod
from typing import Dict
from mm.frameworks.exch.base.status import Status
class OrderbookHandler(ABC):
@beatzxbt
beatzxbt / count_changes.sh
Created May 22, 2024 19:08
count LOC changes in past commits (macOS)
#!/bin/bash
# Set the date range to include all of yesterday
since_date=$(date -v -1d +%Y-%m-%dT00:00:00)
until_date=$(date -v +0d +%Y-%m-%dT00:00:00)
# Get the list of commits from yesterday
commits=$(git log --since="$since_date" --until="$until_date" --pretty=format:"%H")
# Initialize variables to count total additions and deletions
@beatzxbt
beatzxbt / client.py
Created May 16, 2024 22:03
client base class progress
import asyncio
import aiosonic
import orjson
from abc import ABC, abstractmethod
from typing import Tuple, Dict, Union, Any, Literal
from frameworks.tools.logging import Logger, time_ms
class Client(ABC):