Created
July 30, 2021 10:15
-
-
Save gmarkall/62308d951d4c1553ccbe608f391de46d to your computer and use it in GitHub Desktop.
PQ implementation modified
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import numba as nb | |
from numba.experimental import jitclass | |
from typing import List, Tuple, Dict | |
from heapq import heappush, heappop | |
# @jitclass | |
class PurePythonPriorityQueue: | |
def __init__(self): | |
self.pq = [] # list of entries arranged in a heap | |
self.entry_finder = {} # mapping of indices to entries | |
self.REMOVED = -1 # placeholder for a removed item | |
self.counter = itertools.count() # unique sequence count | |
def put(self, item: Tuple[int, int], priority: float = 0.0): | |
"""Add a new item or update the priority of an existing item""" | |
if item in self.entry_finder: | |
self.remove_item(item) | |
count = next(self.counter) | |
entry = [priority, count, item] | |
self.entry_finder[item] = entry | |
heappush(self.pq, entry) | |
def remove_item(self, item: Tuple[int, int]): | |
"""Mark an existing item as REMOVED. Raise KeyError if not found.""" | |
entry = self.entry_finder.pop(item) | |
entry[-1] = self.REMOVED | |
def pop(self): | |
"""Remove and return the lowest priority item. Raise KeyError if empty.""" | |
while self.pq: | |
priority, count, item = heappop(self.pq) | |
if item is not self.REMOVED: | |
del self.entry_finder[item] | |
return item | |
raise KeyError("pop from an empty priority queue") | |
# @jitclass | |
# class Item: | |
# i: int | |
# j: int | |
# | |
# def __init__(self, i, j): | |
# self.i = i | |
# self.j = j | |
# | |
# def __eq__(self, other): | |
# return self.i == other.i and self.j == other.j | |
@jitclass | |
class Entry: | |
priority: float | |
count: int | |
item: Tuple[int, int] | |
removed: bool | |
def __init__(self, p: float, c: int, i: Tuple[int, int]): | |
self.priority = p | |
self.count = c | |
self.item = i | |
self.removed = False | |
def __lt__(self, other): | |
return self.priority < other.priority | |
@jitclass | |
class PriorityQueue: | |
pq: List[Entry] | |
entry_finder: Dict[Tuple[int, int], Entry] | |
counter: int | |
def __init__(self): | |
self.pq = nb.typed.List.empty_list(Entry(0.0, 0, (0, 0))) | |
self.entry_finder = nb.typed.Dict.empty((0, 0), Entry(0, 0, (0, 0))) | |
self.counter = 0 | |
def put(self, item: Tuple[int, int], priority: float = 0.0): | |
"""Add a new item or update the priority of an existing item""" | |
if item in self.entry_finder: | |
self.remove_item(item) | |
self.counter += 1 | |
entry = Entry(priority, self.counter, item) | |
self.entry_finder[item] = entry | |
heappush(self.pq, entry) | |
def remove_item(self, item: Tuple[int, int]): | |
"""Mark an existing item as REMOVED. Raise KeyError if not found.""" | |
entry = self.entry_finder.pop(item) | |
entry.removed = True | |
def pop(self): | |
"""Remove and return the lowest priority item. Raise KeyError if empty.""" | |
while self.pq: | |
#priority, count, item, removed = heappop(self.pq) | |
entry = heappop(self.pq) | |
if not entry.removed: | |
del self.entry_finder[entry.item] | |
return entry.item | |
raise KeyError("pop from an empty priority queue") | |
if __name__ == "__main__": | |
queue1 = PurePythonPriorityQueue() | |
queue1.put((4, 5), 5.4) | |
queue1.put((5, 6), 1.0) | |
print(queue1.pop()) # Yay this works! | |
queue2 = PriorityQueue() # Nope | |
queue2.put((4, 5), 5.4) | |
queue2.put((5, 6), 1.0) | |
print(queue2.pop()) | |
# queue3 = OptimalQueue() | |
# for i in range(5): | |
# for j in range(5): | |
# queue3.add_index(i, j) | |
# # [Index(i, j) for i in range(5) for j in range(5)] | |
# print(queue3.indices) | |
# queue3.put((4, 5), 5.4) | |
# queue3.put((5, 6), 1.0) | |
# print(queue3.pop()) # Yay this works! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment