Created
February 23, 2022 21:18
-
-
Save yeus/1d83f40e14633532c7cdda7dff99cb33 to your computer and use it in GitHub Desktop.
UniqueDynamicPriorityQueue - dynamically modifiable unique priority task queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class UniqueDynamicPriorityQueue(asyncio.Queue): | |
"""A subclass of Queue; retrieves entries in priority order (lowest first). | |
Entries are typically tuples of the form: (priority number, data). | |
Entries can only be unique and when the same data gets pushed twice, | |
it will replace the old one. | |
Entries can be modified hence this this a "dynamic" priority queue. | |
This class is based on a priority queue implemenation from here: | |
https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes | |
""" | |
def _init(self, maxsize): | |
self._queue = [] # list of entries arranged in a heap | |
self.entry_finder = {} # mapping of tasks to entries | |
self.REMOVED = '<removed-task>' # placeholder for a removed task | |
self.counter = itertools.count() # unique sequence count | |
self.processed_item = None | |
def _put(self, item): | |
self.add_task(priority=item[0], item=item[1]) | |
def _get(self): | |
if self.processed_item: # if we left an item unfinished... | |
# put it back in the queue... | |
self.put(self.processed_item) | |
self.processed_item = None | |
item = self.pop_task() | |
self.processed_item = item | |
return item | |
def get_item_priority(self, item): | |
return self.entry_finder[item][0] | |
def add_task(self, priority, item): | |
'Add a new task or update the priority of an existing task' | |
if item in self.entry_finder: | |
self.remove_task(item) | |
count = next(self.counter) | |
entry = [priority, count, item] | |
self.entry_finder[item] = entry | |
heapq.heappush(self._queue, entry) | |
def remove_task(self, item): | |
'Mark an existing task as REMOVED. Raise KeyError if not found.' | |
entry = self.entry_finder.pop(item) | |
# add removed marker to item data so that we can remove it at some point... | |
entry[-1] = self.REMOVED | |
def pop_task(self): | |
'Remove and return the lowest priority task. Raise KeyError if empty.' | |
while self._queue: | |
priority, count, item = heapq.heappop(self._queue) | |
if item is not self.REMOVED: | |
del self.entry_finder[item] | |
return item | |
raise KeyError('pop from an empty priority queue') | |
def task_done(self): | |
super().task_done() | |
self.remove_task(self.processed_item) | |
self.processed_item = None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment