Created
June 16, 2020 08:56
-
-
Save Greyvend/8474814094175559fed215dfd80df603 to your computer and use it in GitHub Desktop.
Addressable Binary Heap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from operator import lt | |
class Heap: | |
"""Binary heap, with pluggable comparison function.""" | |
def __init__(self, cmp=lt): | |
self.len = 0 | |
self.arr = [] | |
self.cmp = cmp | |
def peek(self): | |
"""Take a peek at the top of the heap, without touching it. | |
:return: the top element | |
""" | |
if not self.arr: | |
return | |
return self.arr[0] | |
def insert(self, value): | |
"""Insert new value in the heap. It might be duplicated. | |
:param value: any kind of value that supports comparison operation | |
specified during initialization | |
""" | |
self.arr.append(value) | |
self.len += 1 | |
self._pop_up() | |
def popitem(self): | |
"""Remove top element from the heap if it exists, do nothing otherwise. | |
:return: the top element | |
""" | |
peak = self.peek() | |
if not peak: | |
return | |
self._switch(0, self.len - 1) | |
self.arr.pop() | |
self.len -= 1 | |
self._push_down() | |
return peak | |
def __len__(self): | |
return self.len | |
def __repr__(self): | |
return '\n'.join(map(str, self.arr)) | |
def _switch(self, i, j): | |
"""Switch two heap elements""" | |
self.arr[i], self.arr[j] = self.arr[j], self.arr[i] | |
def _pop_up(self, cur=None): | |
"""One of the two basic heap operations, bring new element up the heap. | |
:param cur: index of the element in the heap array, last one if not | |
specified | |
""" | |
if self.len < 1: | |
return | |
cur = cur or self.len - 1 | |
parent = math.ceil((cur - 2) / 2) | |
while parent >= 0 and not self.cmp(self.arr[parent], self.arr[cur]): | |
self._switch(parent, cur) | |
cur = parent | |
parent = math.ceil((cur - 2) / 2) | |
def _push_down(self, cur=None): | |
"""Another basic heap operations, push the element down the heap. | |
:param cur: index of the element in the heap array, top one if not | |
specified | |
""" | |
cur = cur or 0 | |
while 2 * cur + 1 < self.len: | |
child = 2 * cur + 1 | |
if child + 1 < self.len and self.cmp(self.arr[child + 1], | |
self.arr[child]): | |
child += 1 | |
if not self.cmp(self.arr[child], self.arr[cur]): | |
break | |
self._switch(child, cur) | |
cur = child | |
class AddressableHeap(Heap): | |
"""Addressable heap items should be tuples, first element is the handle | |
Each element is in the form (h, a1, a2, ..., an). Comparison function by | |
default will prioritize over the handle, so it should be adjusted. | |
""" | |
def __init__(self, **kwargs): | |
self.addresses = {} | |
super(AddressableHeap, self).__init__(**kwargs) | |
def insert(self, value): | |
"""Change existing handle if it is present, insert otherwise.""" | |
if value[0] in self.addresses: | |
self._change(value) | |
else: | |
self.addresses[value[0]] = self.len | |
super(AddressableHeap, self).insert(value) | |
def popitem(self): | |
peak = super(AddressableHeap, self).popitem() | |
if peak is not None: | |
self.addresses.pop(peak[0]) | |
return peak | |
def _switch(self, i, j): | |
handle_1 = self.arr[i][0] | |
handle_2 = self.arr[j][0] | |
self.addresses[handle_1], self.addresses[handle_2] = j, i | |
super(AddressableHeap, self)._switch(i, j) | |
def _change(self, new_value): | |
"""Change existing element to a new value and restore heap property. | |
This is basically the main logic of addressable heap. We use addresses | |
to locate existing element in O(1), change its value in the heap and | |
restore the heap property by pushing down/popping the element up based | |
on the new value it has. | |
""" | |
addr = self.addresses[new_value[0]] | |
up = self.cmp(new_value, self.arr[addr]) | |
self.arr[addr] = new_value | |
if up: | |
self._pop_up(addr) | |
else: | |
self._push_down(addr) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from operator import gt | |
from heap import Heap, AddressableHeap | |
def test_heap_min(): | |
min_heap = Heap() | |
min_heap.insert(3) | |
min_heap.insert(4) | |
min_heap.insert(8) | |
min_heap.insert(9) | |
min_heap.insert(7) | |
min_heap.insert(10) | |
min_heap.insert(9) | |
min_heap.insert(15) | |
min_heap.insert(20) | |
min_heap.insert(13) | |
assert min_heap.arr == [3, 4, 8, 9, 7, 10, 9, 15, 20, 13] | |
assert min_heap.peek() == 3 | |
min_heap.insert(2) | |
assert min_heap.arr == [2, 3, 8, 9, 4, 10, 9, 15, 20, 13, 7] | |
assert min_heap.peek() == 2 | |
min_heap.popitem() | |
assert min_heap.arr == [3, 4, 8, 9, 7, 10, 9, 15, 20, 13] | |
assert min_heap.peek() == 3 | |
min_heap.popitem() | |
assert min_heap.arr == [4, 7, 8, 9, 13, 10, 9, 15, 20] | |
assert min_heap.peek() == 4 | |
min_heap.popitem() | |
assert min_heap.arr == [7, 9, 8, 15, 13, 10, 9, 20] | |
assert min_heap.peek() == 7 | |
def test_heap_max(): | |
max_heap = Heap(cmp=gt) | |
max_heap.insert(3) | |
max_heap.insert(4) | |
max_heap.insert(8) | |
max_heap.insert(9) | |
max_heap.insert(7) | |
max_heap.insert(10) | |
max_heap.insert(9) | |
max_heap.insert(15) | |
max_heap.insert(20) | |
max_heap.insert(13) | |
assert max_heap.arr == [20, 15, 9, 10, 13, 4, 9, 3, 8, 7] | |
assert max_heap.peek() == 20 | |
max_heap.insert(2) | |
assert max_heap.arr == [20, 15, 9, 10, 13, 4, 9, 3, 8, 7, 2] | |
assert max_heap.peek() == 20 | |
max_heap.popitem() | |
assert max_heap.arr == [15, 13, 9, 10, 7, 4, 9, 3, 8, 2] | |
assert max_heap.peek() == 15 | |
def test_addressable_heap_small(): | |
min_heap = AddressableHeap(cmp=lambda x, y: x[1] < y[1]) | |
assert min_heap.peek() is None | |
min_heap.popitem() # check that it doesn't fail | |
min_heap.insert(('a', 3)) | |
min_heap.popitem() | |
assert min_heap.arr == [] | |
assert min_heap.addresses == {} | |
min_heap.insert(('a', 3)) | |
min_heap.insert(('b', 4)) | |
min_heap.popitem() | |
assert min_heap.arr == [('b', 4)] | |
assert min_heap.addresses == {'b': 0} | |
def test_addressable_heap_large(): | |
min_heap = AddressableHeap(cmp=lambda x, y: x[1] < y[1]) | |
min_heap.insert(('a', 3)) | |
min_heap.insert(('b', 4)) | |
min_heap.insert(('c', 8)) | |
min_heap.insert(('d', 9)) | |
min_heap.insert(('e', 7)) | |
assert min_heap.arr == [('a', 3), ('b', 4), ('c', 8), ('d', 9), ('e', 7)] | |
assert min_heap.peek() == ('a', 3) | |
# insert new element | |
min_heap.insert(('f', 2)) | |
assert min_heap.arr == [('f', 2), ('b', 4), ('a', 3), ('d', 9), ('e', 7), | |
('c', 8)] | |
assert min_heap.peek() == ('f', 2) | |
# set smaller value to existing element | |
min_heap.insert(('b', 1)) | |
assert min_heap.arr == [('b', 1), ('f', 2), ('a', 3), ('d', 9), ('e', 7), | |
('c', 8)] | |
assert min_heap.addresses == {'b': 0, 'f': 1, 'a': 2, 'd': 3, 'e': 4, | |
'c': 5} | |
assert min_heap.peek() == ('b', 1) | |
# set larger value to existing element | |
min_heap.insert(('f', 10)) | |
assert min_heap.arr == [('b', 1), ('e', 7), ('a', 3), ('d', 9), ('f', 10), | |
('c', 8)] | |
assert min_heap.addresses == {'b': 0, 'e': 1, 'a': 2, 'd': 3, 'f': 4, | |
'c': 5} | |
assert min_heap.peek() == ('b', 1) | |
# remove some elements | |
min_heap.popitem() | |
assert min_heap.arr == [('a', 3), ('e', 7), ('c', 8), ('d', 9), ('f', 10)] | |
assert min_heap.addresses == {'a': 0, 'e': 1, 'c': 2, 'd': 3, 'f': 4} | |
assert min_heap.peek() == ('a', 3) | |
min_heap.popitem() | |
assert min_heap.arr == [('e', 7), ('d', 9), ('c', 8), ('f', 10)] | |
assert min_heap.addresses == {'e': 0, 'd': 1, 'c': 2, 'f': 3} | |
assert min_heap.peek() == ('e', 7) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment