Skip to content

Instantly share code, notes, and snippets.

@dyerrington
Last active September 18, 2024 17:39
Show Gist options
  • Save dyerrington/2cd5b91f9223074996dd074ad3296934 to your computer and use it in GitHub Desktop.
Save dyerrington/2cd5b91f9223074996dd074ad3296934 to your computer and use it in GitHub Desktop.
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class ApiClient:
def __init__(self):
pass
@RateLimiter(rate_limit=5, rate_unit="seconds")
def make_request(self, data):
print(f"[{threading.current_thread().name}] Processing data: {data}")
# Simulate API call
time.sleep(0.1)
return f"Result for {data}"
def main():
client = ApiClient()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(client.make_request, i) for i in range(20)]
for future in as_completed(futures):
print(future.result())
if __name__ == "__main__":
main()
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class ApiClient:
def __init__(self):
pass
@RateLimiter(rate_limit=55, rate_unit="minutes")
def make_request(self, data):
current_time = time.strftime("%H:%M:%S", time.localtime())
thread_name = threading.current_thread().name
print(f"[{current_time}] [{thread_name}] Processing data: {data}")
# Simulate API call
time.sleep(0.1) # Simulate network latency
return f"Result for {data}"
def main():
client = ApiClient()
num_requests = 60 # Total number of requests to make
max_workers = 10 # Number of worker threads
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(client.make_request, i) for i in range(num_requests)]
for future in as_completed(futures):
result = future.result()
print(result)
if __name__ == "__main__":
main()
import time
import threading
class RateLimiter:
def __init__(self, rate_limit, rate_unit="seconds"):
self.rate_limit = rate_limit
if rate_unit == "minutes":
self.interval = 60 # Total time window in seconds
elif rate_unit == "seconds":
self.interval = 1 # Total time window in seconds
else:
raise ValueError("Invalid rate_unit value. Must be either 'seconds' or 'minutes'.")
self.lock = threading.Lock()
self.timestamps = []
def __call__(self, func):
def wrapper(*args, **kwargs):
with self.lock:
current_time = time.time()
# Remove timestamps older than the interval
self.timestamps = [timestamp for timestamp in self.timestamps if current_time - timestamp < self.interval]
if len(self.timestamps) >= self.rate_limit:
sleep_time = self.interval - (current_time - self.timestamps[0])
if sleep_time > 0:
print(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.")
time.sleep(sleep_time)
current_time = time.time()
# Clean up old timestamps after sleeping
self.timestamps = [timestamp for timestamp in self.timestamps if current_time - timestamp < self.interval]
self.timestamps.append(current_time)
return func(*args, **kwargs)
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment