Skip to content

Instantly share code, notes, and snippets.

@beatzxbt
Created March 2, 2025 16:00
Show Gist options
  • Save beatzxbt/018742bddc117c29148b2d4191344773 to your computer and use it in GitHub Desktop.
Save beatzxbt/018742bddc117c29148b2d4191344773 to your computer and use it in GitHub Desktop.
basic rate limiter
from libc.stdint cimport uint32_t
from mm_toolbox.time.time cimport time_s
from .engine cimport OrderAction
cdef class RateLimitCounter:
cdef:
uint32_t tokens_per_sec
uint32_t tokens_remaining
double token_topup_time
def __init__(self, int tokens_per_sec) -> None:
self.tokens_per_sec = tokens_per_sec
self.tokens_remaining = tokens_per_sec
self.token_topup_time = time_s() + 1.0
cdef void _maybe_topup(self):
"""
Internal helper: if enough time has passed, top up tokens.
This allows 'consume' to trigger the top-up automatically
once a second has elapsed.
"""
cdef double current_time = time_s()
if current_time >= self.token_topup_time:
self.tokens_remaining = self.tokens_per_sec
self.token_topup_time = current_time + 1.0
cdef int consume(self, double warning_threshold=0.9):
"""
Attempt to consume 1 token from the rate limit.
Returns:
0: Success - token consumed
1: Warning - token consumed but approaching limit (>= warning_threshold)
-1: Failure - no tokens available to consume
"""
self._maybe_topup()
if self.tokens_remaining > 0:
self.tokens_remaining -= 1
if self.tokens_remaining <= self.tokens_per_sec * (1.0 - warning_threshold):
return 1
return 0
else:
return -1
cdef void extern_topup(self):
"""
Externally trigger a top-up of tokens (e.g. by some external event).
Always resets tokens to tokens_per_sec and sets new topup deadline.
"""
self.tokens_remaining = self.tokens_per_sec
self.token_topup_time = time_s() + 1.0
cdef class RateLimiter:
cdef:
RateLimitCounter create_counter
RateLimitCounter amend_counter
RateLimitCounter cancel_counter
RateLimitCounter cancel_all_counter
RateLimitCounter create_batch_counter
RateLimitCounter amend_batch_counter
RateLimitCounter cancel_batch_counter
def __init__(
self,
uint32_t create_per_sec,
uint32_t amend_per_sec,
uint32_t cancel_per_sec,
uint32_t cancel_all_per_sec,
uint32_t create_batch_per_sec=None,
uint32_t amend_batch_per_sec=None,
uint32_t cancel_batch_per_sec=None
):
"""
Initialize a separate RateLimitCounter for each action.
If any of the batch limits is None, default it to the
same as the corresponding single limit.
"""
if create_batch_per_sec is None:
create_batch_per_sec = create_per_sec
if amend_batch_per_sec is None:
amend_batch_per_sec = amend_per_sec
if cancel_batch_per_sec is None:
cancel_batch_per_sec = cancel_per_sec
self.create_counter = RateLimitCounter(create_per_sec)
self.amend_counter = RateLimitCounter(amend_per_sec)
self.cancel_counter = RateLimitCounter(cancel_per_sec)
self.cancel_all_counter = RateLimitCounter(cancel_all_per_sec)
self.create_batch_counter = RateLimitCounter(create_batch_per_sec)
self.amend_batch_counter = RateLimitCounter(amend_batch_per_sec)
self.cancel_batch_counter = RateLimitCounter(cancel_batch_per_sec)
cdef bint consume(self, OrderAction action):
"""
Attempt to consume one token from the rate limit
associated with 'action'. Returns False if tokens are depleted.
As this matches int<>int, Cython automatically compiles
this to a switch-case statement with O(1) lookups.
"""
if action == OrderAction.CREATE:
return self.create_counter.consume()
elif action == OrderAction.AMEND:
return self.amend_counter.consume()
elif action == OrderAction.CANCEL:
return self.cancel_counter.consume()
elif action == OrderAction.CANCEL_ALL:
return self.cancel_all_counter.consume()
elif action == OrderAction.CREATE_BATCH:
return self.create_batch_counter.consume()
elif action == OrderAction.AMEND_BATCH:
return self.amend_batch_counter.consume()
elif action == OrderAction.CANCEL_BATCH:
return self.cancel_batch_counter.consume()
cdef void extern_topup(self, OrderAction action):
"""
Top up the relevant rate limit counter if triggered externally.
As this matches int<>int, Cython automatically compiles
this to a switch-case statement with O(1) lookups.
"""
if action == OrderAction.CREATE:
self.create_counter.extern_topup()
elif action == OrderAction.AMEND:
self.amend_counter.extern_topup()
elif action == OrderAction.CANCEL:
self.cancel_counter.extern_topup()
elif action == OrderAction.CANCEL_ALL:
self.cancel_all_counter.extern_topup()
elif action == OrderAction.CREATE_BATCH:
self.create_batch_counter.extern_topup()
elif action == OrderAction.AMEND_BATCH:
self.amend_batch_counter.extern_topup()
elif action == OrderAction.CANCEL_BATCH:
self.cancel_batch_counter.extern_topup()
@beatzxbt
Copy link
Author

beatzxbt commented Mar 7, 2025

Updated to include ideas discussed here: https://x.com/BeatzXBT/status/1898069608059310135

%%cython

# cython: language_level=3, boundscheck=False, wraparound=False, cdivision=True, nonecheck=False
# distutils: extra_compile_args=['-O3', '-march=native', '-ffast-math']

from collections import deque
from time import time as time_s, time_ns as time_ns
from libc.stdint cimport UINT32_MAX

cdef class RateLimitCounter:
    cdef:
        Py_ssize_t      tokens_per_sec
        Py_ssize_t      tokens_remaining
        double          token_topup_time

        object          warnings
        Py_ssize_t      warning_threshold_count
        Py_ssize_t      warning_threshold_seconds

    def __init__(
        self, 
        Py_ssize_t  tokens_per_sec, 
        Py_ssize_t  warning_threshold_count=5, 
        Py_ssize_t  warning_threshold_seconds=10
    ):
        self.tokens_per_sec = tokens_per_sec
        self.tokens_remaining = tokens_per_sec
        self.token_topup_time = time_s() + 1.0

        self.warnings = deque(maxlen=warning_threshold_seconds)
        self.warning_threshold_count = warning_threshold_count
        self.warning_threshold_seconds = warning_threshold_seconds

    cdef void _maybe_topup(self):
        """
        Internal helper: if enough time has passed, top up tokens.
        This allows 'consume' to trigger the top-up automatically
        once a second has elapsed.
        """
        cdef double current_time = time_s()
        if current_time >= self.token_topup_time:
            self.tokens_remaining = self.tokens_per_sec
            self.token_topup_time = current_time + 1.0

    cdef Py_ssize_t _maybe_critical_warn(self, double current_time):
        """
        Internal helper: If a warning has been issues, add it to the ring buffer.
        If 4 warnings have been issues in the last 10 seconds, return 2.
        """
        self.warnings.append(current_time)

        # If the oldest warning record is more than X seconds old, remove it
        while self.warnings[0] < current_time - self.warning_threshold_seconds:
            _burn = self.warnings.popleft()
        
        # If N warnings have been issues in the last X seconds, return 2
        if len(self.warnings) >= self.warning_threshold_count:
            self.warnings.clear()
            return 2
        else:
            return 1

    cpdef Py_ssize_t consume(self, double warning_threshold=0.7):
        """
        Attempt to consume 1 token from the rate limit.
        Returns:
          0: Success  - token consumed
          1: Warning  - token consumed but approaching limit (>= warning_threshold)
          2: Critical - too many warnings in recent time period, be very careful
         -1: Failure  - no tokens available to consume
        """
        cdef double current_time = time_s()
        
        self._maybe_topup()
        if self.tokens_remaining > 0:
            self.tokens_remaining -= 1
            if self.tokens_remaining <= self.tokens_per_sec * (1.0 - warning_threshold):
                return self._maybe_critical_warn(current_time)
            return 0
        else:
            return -1

    cdef void extern_topup(self):
        """
        Externally trigger a top-up of tokens (e.g. by some external event).
        Always resets tokens to tokens_per_sec and sets new topup deadline.
        """
        self.tokens_remaining = self.tokens_per_sec
        self.token_topup_time = time_s() + 1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment