Created
September 13, 2023 11:44
-
-
Save vfreex/bd89622de7a9bd528c3f2c6720415d13 to your computer and use it in GitHub Desktop.
redis_lock_with_auto_extend.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import redis.asyncio as redis | |
from redis.asyncio.lock import Lock | |
from redis.asyncio.retry import Retry | |
from redis.backoff import ExponentialBackoff | |
async def long_running_task(): | |
print("Job is running...") | |
await asyncio.sleep(60) | |
async def auto_extend_lock(lock: Lock, when: float, new: float): | |
try: | |
while await lock.owned(): | |
await asyncio.sleep(when) | |
print("Extending lock...") | |
await lock.extend(new, replace_ttl=True) | |
print("Lock extended") | |
except asyncio.CancelledError: | |
print('cancel_me(): cancel sleep') | |
raise | |
finally: | |
print('cancel_me(): after sleep') | |
async def main(): | |
password = os.environ['REDIS_TOKEN'] | |
retry = Retry(ExponentialBackoff(), 3) | |
try: | |
async with asyncio.timeout(120): | |
conn = redis.Redis( | |
host="master.redis.gwprhd.use1.cache.amazonaws.com", port=6379, | |
password=password, ssl=True, decode_responses=True, protocol=3, | |
retry=retry, retry_on_error=[redis.BusyLoadingError, redis.ConnectionError, redis.TimeoutError]) | |
async with conn: | |
async with Lock(conn, "test-lock", blocking_timeout=3, timeout=10) as lock: | |
print("Lock acquired") | |
auto_extend_task = asyncio.create_task(auto_extend_lock(lock, when=10 * 0.6, new=10)) | |
await long_running_task() | |
auto_extend_task.cancel() | |
except TimeoutError: | |
print("Job timed out") | |
finally: | |
print("Lock released") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment