Skip to content

Instantly share code, notes, and snippets.

@gregburek
Created December 7, 2011 01:51
Show Gist options
  • Save gregburek/1441055 to your computer and use it in GitHub Desktop.
Save gregburek/1441055 to your computer and use it in GitHub Desktop.
Rate limiting function calls with Python Decorators
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
@zcmyth
Copy link

zcmyth commented Jul 23, 2015

Because it is not a local variable, and we can't modify it unless nonlocal is used.
However, nonlocal is a new feature in python 3. In order to modify last_time_called in python 2.x, as a workaround, we use array and modify its element.

@sintezcs
Copy link

sintezcs commented Aug 12, 2016

Multi-theading issues occur in both versions, that use locks posted above. last_time_called is being updated after the lock has been released. That leads to upredictable fluctuations in rate limits when the decorator is being called from multiple threads. The final fixed version that is being used in production:

def rate_limited(max_per_second):
    lock = threading.Lock()
    min_interval = 1.0 / max_per_second

    def decorate(func):
        last_time_called = time.perf_counter()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            lock.acquire()
            nonlocal last_time_called
            elapsed = time.perf_counter() - last_time_called
            left_to_wait = min_interval - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            ret = func(*args, **kwargs)
            last_time_called = time.perf_counter()
            lock.release()
            return ret

        return rate_limited_function

    return decorate

Note: nonlocal keyword has been used, so the code is only compatible with python 3

@perklet
Copy link

perklet commented Mar 22, 2017

@sintezcs great work

@manurueda
Copy link

love it

@mwek
Copy link

mwek commented May 25, 2017

Version posted by @sintezcs will not release the lock if func raises Exception.

def rate_limited(max_per_second: int):
    """Rate-limits the decorated function locally, for one process."""
    lock = threading.Lock()
    min_interval = 1.0 / max_per_second

    def decorate(func):
        last_time_called = time.perf_counter()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            lock.acquire()
            nonlocal last_time_called
            try:
                elapsed = time.perf_counter() - last_time_called
                left_to_wait = min_interval - elapsed
                if left_to_wait > 0:
                    time.sleep(left_to_wait)

                return func(*args, **kwargs)
            finally:
                last_time_called = time.perf_counter()
                lock.release()

        return rate_limited_function

    return decorate

@tim-seoss
Copy link

tim-seoss commented Sep 20, 2017

This is great. I can't see any mention of a license tho, which means:

Code without an explicit license is protected by copyright and is by default All Rights Reserved. The person or people who wrote the code are protected as such. Any time you're using software you didn't write, licensing should be considered and abided. - Brian Doll, GitHub's VP of Marketing

https://www.infoworld.com/article/2615869/open-source-software/github-needs-to-take-open-source-seriously.html

i.e. using the code will be copyright infringement in most parts of the world, and so is risky... Any chance we could make it open source so that it can be used as well?

https://choosealicense.com/no-license/

At this stage, this would require the original poster, and anyone who has modified a given version (including intermediate versions) to declare licensing. Can I suggest the original posted makes a choice following this guidance here:

https://choosealicense.com/

maybe MIT? MIT licenced code can of course be re-licenced under other terms by modifiers if they want to, so that their modifications fall under copyleft (e.g. MIT -> GPL, but not the other way around).

Thanks!

Tim.

@oPromessa
Copy link

oPromessa commented Mar 4, 2018

Thanks team for sharing this. I've built a class for use in multiprocessing mode. Hope it helps others.

  • I'm using time.time()
  • Using multiprocessing instead of threading
  • it's compatible with python 2.7 and 3.6
"""
    by oPromessa, 2017
    Published on https://github.com/oPromessa/flickr-uploader/

    Inspired by: https://gist.github.com/gregburek/1441055

    Helper class and functions to rate limiting function calls with Python Decorators

MIT License

Copyright (c) 2017 oPromessa

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

"""

# ----------------------------------------------------------------------------
# Import section for Python 2 and 3 compatible code
# from __future__ import absolute_import, division, print_function, unicode_literals
from __future__ import division    # This way: 3 / 2 == 1.5; 3 // 2 == 1

# ----------------------------------------------------------------------------
# Import section
#
import sys
import logging
import multiprocessing
import time
from functools import wraps


# -----------------------------------------------------------------------------
# class LastTime to be used with rate_limited
#
class LastTime:
    """
        >>> import rate_limited as rt
        >>> a = rt.LastTime()
        >>> a.add_cnt()
        >>> a.get_cnt()
        1
        >>> a.add_cnt()
        >>> a.get_cnt()
        2
    """

    def __init__(self, name='LT'):
        # Init variables to None
        self.name = name
        self.ratelock = None
        self.cnt = None
        self.last_time_called = None

        # Instantiate control variables
        self.ratelock = multiprocessing.Lock()
        self.cnt = multiprocessing.Value('i', 0)
        self.last_time_called = multiprocessing.Value('d', 0.0)

        logging.debug('\t__init__: name=[{!s}]'.format(self.name))

    def acquire(self):
        self.ratelock.acquire()

    def release(self):
        self.ratelock.release()

    def set_last_time_called(self):
        self.last_time_called.value = time.time()
        # self.debug('set_last_time_called')

    def get_last_time_called(self):
        return self.last_time_called.value

    def add_cnt(self):
        self.cnt.value += 1

    def get_cnt(self):
        return self.cnt.value

    def debug(self, debugname='LT'):
        now=time.time()
        logging.debug('___Rate name:[{!s}] '
                      'debug=[{!s}] '
                      '\n\t        cnt:[{!s}] '
                      '\n\tlast_called:{!s} '
                      '\n\t  timenow():{!s} '
                      .format(self.name,
                              debugname,
                              self.cnt.value,
                              time.strftime(
                                '%T.{}'
                                .format(str(self.last_time_called.value -
                                            int(self.last_time_called.value))
                                            .split('.')[1][:3]),
                                time.localtime(self.last_time_called.value)),
                              time.strftime(
                                '%T.{}'
                                .format(str(now -
                                            int(now))
                                            .split('.')[1][:3]),
                                time.localtime(now))))


# -----------------------------------------------------------------------------
# rate_limited
#
# retries execution of a function
def rate_limited(max_per_second):

    min_interval = 1.0 / max_per_second
    LT = LastTime('rate_limited')

    def decorate(func):
        LT.acquire()
        if LT.get_last_time_called() == 0:
            LT.set_last_time_called()
        LT.debug('DECORATE')
        LT.release()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):

            logging.warning('___Rate_limited f():[{!s}]: '
                            'Max_per_Second:[{!s}]'
                            .format(func.__name__, max_per_second))

            try:
                LT.acquire()
                LT.add_cnt()
                xfrom = time.time()

                elapsed = xfrom - LT.get_last_time_called()
                left_to_wait = min_interval - elapsed
                logging.debug('___Rate f():[{!s}] '
                              'cnt:[{!s}] '
                              '\n\tlast_called:{!s} '
                              '\n\t time now():{!s} '
                              'elapsed:{:6.2f} '
                              'min:{!s} '
                              'to_wait:{:6.2f}'
                              .format(func.__name__,
                                      LT.get_cnt(),
                                      time.strftime(
                                            '%T',
                                            time.localtime(
                                                LT.get_last_time_called())),
                                      time.strftime('%T',
                                                    time.localtime(xfrom)),
                                      elapsed,
                                      min_interval,
                                      left_to_wait))
                if left_to_wait > 0:
                    time.sleep(left_to_wait)

                ret = func(*args, **kwargs)

                LT.debug('OVER')
                LT.set_last_time_called()
                LT.debug('NEXT')

            except Exception as ex:
                # CODING: To be changed once reportError is on a module
                sys.stderr.write('+++000 '
                                 'Exception on rate_limited_function: [{!s}]\n'
                                 .format(ex))
                sys.stderr.flush()
                # reportError(Caught=True,
                #              CaughtPrefix='+++',
                #              CaughtCode='000',
                #              CaughtMsg='Exception on rate_limited_function',
                #              exceptUse=True,
                #              # exceptCode=ex.code,
                #              exceptMsg=ex,
                #              NicePrint=False,
                #              exceptSysInfo=True)
                raise
            finally:
                LT.release()
            return ret

        return rate_limited_function

    return decorate
# -----------------------------------------------------------------------------
# Samples
#@rate_limited(5) # 5 calls per second
# def print_num(num):
#     print (num )


# -----------------------------------------------------------------------------
# If called directly run doctests
#
if __name__ == "__main__":

    logging.basicConfig(level=logging.DEBUG,
                        format='[%(asctime)s]:[%(processName)-11s]' +
                               '[%(levelname)-8s]:[%(name)s] %(message)s')

    import doctest
    doctest.testmod()

    # Comment following line to allow further debugging/testing
    # sys.exit(0)

    # n for n calls per second  (ex. 3 means 3 calls per second)
    # 1/n for n seconds per call (ex. 0.5 meand 4 seconds in between calls)
    @rate_limited(1)
    def print_num(prc, num):
        """
        """
        print('\t\t***prc:[{!s}] num:[{!s}] '
              'rate_limit timestamp:[{!s}]'
              .format(prc, num, time.strftime('%T')))

    print('-------------------------------------------------Single Processing')
    for process in range(1, 3):
        for j in range(1, 2):
            print_num(process, j)

    print('-------------------------------------------------Multi Processing')
    def fmulti(x, prc):
        import random

        for i in range(1,x):
            r = random.randrange(6)
            print('\t\t[prc:{!s}] [{!s}]'
                  '->- WORKing {!s}s----[{!s}]'
                  .format(prc, i, r, time.strftime('%T')))
            time.sleep(r)
            print('\t\t[prc:{!s}] [{!s}]--> Before:---[{!s}]'
                  .format(prc, i, time.strftime('%T')))
            print_num(prc, i)
            print('\t\t[prc:{!s}] [{!s}]<-- After----[{!s}]'
                  .format(prc, i, time.strftime('%T')))

    TaskPool = []

    for j in range(1,4):
        Task = multiprocessing.Process(target=fmulti, args=(5,j))
        TaskPool.append(Task)
        Task.start()

    for j in TaskPool:
        print('{!s}.is_alive = {!s}'.format(j.name, j.is_alive()))

    while (True):
        if not (any(multiprocessing.active_children())):
            print('===No active children Processes.')
            break
        for p in multiprocessing.active_children():
            print('==={!s}.is_alive = {!s}'.format(p.name, p.is_alive()))
            uploadTaskActive = p
        print('===Will wait for 60 on {!s}.is_alive = {!s}'
              .format(uploadTaskActive.name,
                      uploadTaskActive.is_alive()))
        uploadTaskActive.join(timeout=60)
        print('===Waited for 60s on {!s}.is_alive = {!s}'
              .format(uploadTaskActive.name,
                      uploadTaskActive.is_alive()))

    # Wait for join all jobs/tasks in the Process Pool
    # All should be done by now!
    for j in TaskPool:
        j.join()
        print('==={!s} (is alive: {!s}).exitcode = {!s}'
              .format(j.name, j.is_alive(), j.exitcode))

@3lixy
Copy link

3lixy commented May 19, 2018

@oPromessa thanks for sharing and nice job on the logging and example.

@slidenerd
Copy link

Why dont you use a counter instead of a list. Each time the function is called, increment the counter by 1 and reset the counter to 0 when the second or minute has changed

@luizfzs
Copy link

luizfzs commented Mar 19, 2020

Any updates regarding licensing of these pieces of code?

@codekiln
Copy link

Note, there are other libraries on PyPI that do this, such as ratelimit

@hugokernel
Copy link

A more simple version but with async compatibility https://gist.github.com/hugokernel/c478d639849e1e772b1395a546100031

@yeus
Copy link

yeus commented Dec 2, 2021

I created a similar asyncio version but using the token bucket algorithm for
max number of calls during/time interval which results in a maximum burst rate. it can be combined with the above rate limiter without burst rate..

https://gist.github.com/yeus/dff02dce88c6da9073425b5309f524dd

@teocns
Copy link

teocns commented Sep 28, 2023

An extended version of @oPromessa 's utility. Thank you for sharing @oPromessa !

@martinandrovich
Copy link

I added Python 3.12 generic type-hinting and support for recursive calls using an RLock:

from typing import Callable, cast
from functools import wraps
import threading
import time


def rate_limited[T, **P](max_per_second: int) -> Callable[[Callable[P, T]], Callable[P, T]]:

    def decorator(fn: Callable[P, T]) -> Callable[P, T]:

        lock = threading.RLock()
        min_interval = 1.0 / max_per_second
        last_time_called = 0.0

        @wraps(fn)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            with lock:
                nonlocal last_time_called
                elapsed = time.perf_counter() - last_time_called
                left_to_wait = min_interval - elapsed
                if left_to_wait > 0:
                    time.sleep(left_to_wait)

                last_time_called = time.perf_counter()
                return fn(*args, **kwargs)

        return cast(Callable[P, T], wrapper)

    return decorator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment