Created
June 28, 2019 12:09
-
-
Save timofurrer/db44ad05ffffd74f73384e2eb0bfb682 to your computer and use it in GitHub Desktop.
Python 3 Implementation of prioritized Lock objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module implements a set of prioritized locks. | |
Implemented: | |
* PriorityLock | |
* FIFOPriorityLock | |
License: MIT <[email protected]> | |
""" | |
import time | |
import queue | |
import threading | |
class PriorityLock: | |
"""Lock object which prioritizes each acquire | |
>>> import random | |
>>> thread_exec_order = [] | |
>>> lock = PriorityLock() | |
>>> def worker(priority): | |
... with lock(priority): | |
... time.sleep(0.2) | |
... thread_exec_order.append(priority) | |
>>> threads = [ | |
... threading.Thread(target=worker, args=(p,)) | |
... for p in range(10) | |
... ] | |
>>> random.shuffle(threads) | |
>>> for thread in threads: | |
... thread.start() | |
>>> for thread in threads: | |
... thread.join() | |
>>> # the first thread to be executed is non-deterministic | |
>>> assert thread_exec_order[1:] == list(sorted(thread_exec_order[1:])) | |
""" | |
class _Context: | |
def __init__(self, lock, priority): | |
self._lock = lock | |
self._priority = priority | |
def __enter__(self): | |
self._lock.acquire(self._priority) | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self._lock.release() | |
def __init__(self): | |
self._lock = threading.Lock() | |
self._acquire_queue = queue.PriorityQueue() | |
self._need_to_wait = False | |
def acquire(self, priority): | |
with self._lock: | |
if not self._need_to_wait: | |
self._need_to_wait = True | |
return True | |
event = threading.Event() | |
self._acquire_queue.put((priority, event)) | |
event.wait() | |
return True | |
def release(self): | |
with self._lock: | |
try: | |
_, event = self._acquire_queue.get_nowait() | |
except queue.Empty: | |
self._need_to_wait = False | |
else: | |
event.set() | |
def __call__(self, priority): | |
return self._Context(self, priority) | |
class FIFOPriorityLock(PriorityLock): | |
"""Lock object which prioritizes acquires by first-comes-first-serves | |
>>> import random | |
>>> thread_exec_order = [] | |
>>> lock = FIFOPriorityLock() | |
>>> def worker(): | |
... with lock: | |
... time.sleep(0.2) | |
... thread_exec_order.append(threading.current_thread()) | |
>>> threads = [ | |
... threading.Thread(target=worker, name=str(x)) | |
... for x in range(10) | |
... ] | |
>>> random.shuffle(threads) | |
>>> for thread in threads: | |
... thread.start() | |
>>> for thread in threads: | |
... thread.join() | |
>>> assert thread_exec_order == threads | |
""" | |
def acquire(self): | |
acquiring_time = time.time() | |
super().acquire(priority=acquiring_time) | |
def __enter__(self): | |
self.acquire() | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.release() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment