Skip to content

Instantly share code, notes, and snippets.

@jhorman
Last active September 19, 2024 11:57
Show Gist options
  • Save jhorman/8062862 to your computer and use it in GitHub Desktop.
Save jhorman/8062862 to your computer and use it in GitHub Desktop.
Redis semaphore implemented in Python via zsets. Lock expiration is implemented by only scanning the zset for items within a time range.
from __future__ import absolute_import
from time import time, sleep
import uuid
class RedisSemaphore(object):
"""
Redis base semaphore. Supports timeouts of semaphore locks.
"""
def __init__(self, redis, name, limit, timeout_seconds=10):
"""
Timeout specifies how long taken semaphores are valid. Expired semaphores don't count
toward the total taken count. Crashed clients therefore will release their locks after
timeout seconds.
@name Redis key name for zset
@limit Number of locks to allow
@timeout_seconds How long to allow old locks to persist.
"""
super(RedisSemaphore, self).__init__()
self.__redis = redis
self.__name = 'semaphore:%s' % name
self.__limit = limit
self.__timeout = timeout_seconds
self.__lock_id = uuid.uuid4().hex
def __enter__(self, blocking=True, wait_for_seconds=5):
"""
Take out a semaphore. Must be released later. Returns false
if a lock couldn't be acquired.
@block If true keeps trying to get the lock for timeout seconds.
@timeout How long to wait for the lock. Returns false if lock not avail.
"""
def acquire_lock(transaction):
# how many locks are already taken in the set. ignores locks that have timed out.
now = time()
count = transaction.zcount(self.__name, now-self.__timeout, now+1)
# set the pipline back to buffered mode
transaction.multi()
# if there is space in the set for an additional lock append it to the list
if count < self.__limit:
# the score of the lock is current time so that locks can expire
transaction.zadd(self.__name, self.__lock_id, time())
return True
# no space available, return False
return False
# keep trying to get the lock for wait_for_seconds seconds
start = time()
while (time() - start) < wait_for_seconds:
if self.__redis.transaction(acquire_lock, self.__name, value_from_callable=True):
now = time()
return True
elif blocking:
self.cleanup()
sleep(.1)
return False
acquire = __enter__
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
""" Release lock by removing from redis sorted set. """
self.__redis.zrem(self.__name, self.__lock_id)
release = __exit__
def cleanup(self):
""" Removes all locks after timeout expiration from the sorted set. """
return self.__redis.zremrangebyscore(self.__name, 0, time()-self.__timeout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment