Skip to content

Instantly share code, notes, and snippets.

@layandreas
Created March 27, 2025 08:49
Show Gist options
  • Save layandreas/7e2fb6a44c17bb61ff45107c856cfb0f to your computer and use it in GitHub Desktop.
Save layandreas/7e2fb6a44c17bb61ff45107c856cfb0f to your computer and use it in GitHub Desktop.
import logging
import time
from typing import Any
from google.cloud import storage
logger = logging.getLogger(__name__)
class RemoteLock:
"""
A class to handle distributed locking using Google Cloud Storage. While
the lock is acquired by the calling process other processes will not be
able to acquire the lock and will wait until they acquire it or until
the timeout is reached.
Parameters
----------
bucket_name : str
The name of the cloud storage bucket.
lock_name : str
The name of the lock file.
lock_timeout_seconds : int, optional
The timeout in seconds to wait for acquiring the lock (default is 60).
Methods
-------
__enter__():
Enters the runtime context related to this object.
__exit__(exc_type, exc_val, exc_tb):
Exits the runtime context related to this object.
lock() -> bool:
Attempts to acquire the lock.
unlock() -> None:
Releases the lock.
wait_for_lock(timeout: int) -> None:
Waits until the lock is acquired or the timeout is reached.
Usage
-----
>>> with RemoteLock(bucket_name="my_bucket", lock_name="my_lock") as lock:
>>> # Critical section of code
>>> pass
"""
def __init__(
self, bucket_name: str, lock_name: str, lock_timeout_seconds: int = 60
) -> None:
self.bucket_name = bucket_name
self.lock_name = lock_name
self.lock_timeout_seconds = lock_timeout_seconds
self.client = storage.Client()
self.bucket = self.client.bucket(bucket_name)
self.lock_blob = self.bucket.blob(lock_name)
def __enter__(self) -> "RemoteLock":
self.wait_for_lock(self.lock_timeout_seconds)
return self
def __exit__(
self,
exc_type: BaseException | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
self.unlock()
def lock(self) -> bool:
logger.debug("Acquiring lock: {}".format(self.lock_name))
try:
# Using if_generation_match=0 allows us to use cloud storage as a
# locking mechanism as the upload will fail if the blob already
# exists. We also don't have write race conditions if another
# process simultaneously tries to acquire the lock
# (upload the file).
self.lock_blob.upload_from_string("lock", if_generation_match=0)
logger.info(f"Lock successfully acquired: {self.lock_name}")
return True
except Exception as e:
logger.debug(f"Cannot acquire lock {self.lock_name}: {e}")
return False
def unlock(self) -> None:
logger.info(f"Releasing lock: {self.lock_name}")
self.lock_blob.delete()
logger.info(f"Lock successfully released: {self.lock_name}")
def wait_for_lock(self, timeout: int) -> None:
start_time = time.time()
logger.info(f"Waiting for lock: {self.lock_name}")
while not self.lock():
if time.time() - start_time > timeout:
raise TimeoutError(
f"Could not acquire lock {self.lock_name} on bucket "
f"{self.bucket_name} within the specified timeout. "
"This means another process is holding the lock. "
"If you think the other process failed to release the lock "
"you can manually release it by deleting the lock file "
f"on the storage bucket {self.bucket_name}."
)
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment