Skip to content

Instantly share code, notes, and snippets.

@270ajay
Created March 5, 2020 06:48
Show Gist options
  • Save 270ajay/578733e518c78a3d79e20d919d31b9df to your computer and use it in GitHub Desktop.
Save 270ajay/578733e518c78a3d79e20d919d31b9df to your computer and use it in GitHub Desktop.
priority Queue and disjoint sets
'''Course: https://www.coursera.org/learn/data-structures#about'''
class BinaryHeap:
'''Priority Queue. Insert values to this data structure and get max value stored quickly'''
defaultArraySize = 10
def __init__(self, size=None):
self.maxSize = size if size != None else BinaryHeap.defaultArraySize
self.array = [None] * self.maxSize
self.size = 0
@staticmethod
def parent(index):
return int((index-1)/2)
@staticmethod
def leftChild(index):
return (2*index) + 1
@staticmethod
def rightChild(index):
return (2*index) + 2
def siftUp(self, index):
'''this is done to maintain heap property. Element at Parent Index >= elements at Child Indices.
If child element is > parent element, then we swap it'''
while index > 0 and self.array[BinaryHeap.parent(index)] < self.array[index]:
BinaryHeap.swapElements(self.array, BinaryHeap.parent(index), index)
index = BinaryHeap.parent(index)
@staticmethod
def siftDown(index, size, theList):
'''this is done to maintain heap property. Element at Parent Index >= elements at Child Indices.
If parent element is < child element, then we swap it'''
maxElementIndex = index
leftChildIndex = BinaryHeap.leftChild(index)
if leftChildIndex <= size - 1 and theList[leftChildIndex] > theList[maxElementIndex]:
maxElementIndex = leftChildIndex
rightChildIndex = BinaryHeap.rightChild(index)
if rightChildIndex <= size - 1 and theList[rightChildIndex] > theList[maxElementIndex]:
maxElementIndex = rightChildIndex
if index != maxElementIndex:
BinaryHeap.swapElements(theList, index, maxElementIndex)
BinaryHeap.siftDown(maxElementIndex, size, theList)
def insert(self, priority):
if self.size == self.maxSize:
raise Exception("No space left in array")
self.array[self.size] = priority
self.siftUp(self.size)
self.size += 1
def extractMax(self):
result = self.array[0]
self.array[0] = self.array[self.size-1]
self.size -= 1
BinaryHeap.siftDown(0, self.size, self.array)
return result
def remove(self, index):
self.array[index] = 1e+5
self.siftUp(index)
self.extractMax()
def changePriority(self, index, priority):
oldPriority = self.array[index]
self.array[index] = priority
if priority > oldPriority:
self.siftUp(index)
else:
BinaryHeap.siftDown(index, self.size, self.array)
@staticmethod
def swapElements(theList, index1, index2):
temp = theList[index1]
theList[index1] = theList[index2]
theList[index2] = temp
@staticmethod
def buildHeap(theList, size):
'''converts theList to binary heap'''
for index in range(int((size-1)/2), -1, -1):
BinaryHeap.siftDown(index, size, theList)
def __str__(self):
return str(self.array)
def heapSort(listToSort):
'''almost as fast as quick sort'''
size = len(listToSort)
BinaryHeap.buildHeap(listToSort, size)
for _ in range(size-1):
BinaryHeap.swapElements(listToSort, 0, size-1)
size -= 1
BinaryHeap.siftDown(0, size, listToSort)
return listToSort
def partialSort(listToSort, firstKMaxElements):
'''returns the first k max elements in linear time if firstKMaxElements is smaller than n/log(n)'''
size = len(listToSort)
BinaryHeap.buildHeap(listToSort, size)
largestKElements = []
for i in range(firstKMaxElements):
largestKElements.append(listToSort[0])
BinaryHeap.swapElements(listToSort, 0, size-1)
size -= 1
BinaryHeap.siftDown(0, size, listToSort)
return largestKElements
class DisjointSets:
'''Finds quickly if two points belong to same set'''
def __init__(self, totalNumOfSets):
self.parent = [None] * totalNumOfSets
self.rank = [None] * totalNumOfSets
def makeSet(self, id):
self.parent[id] = id
self.rank[id] = 0
def find(self, id):
while id != self.parent[id]:
id = self.parent[id]
return id
def findFast(self, id):
'''path compression heuristic. faster than find() in future calls to this method
because we update the parent info'''
if id != self.parent[id]:
self.parent[id] = self.findFast(self.parent[id])
return self.parent[id]
def union(self, id1, id2):
'''union by rank heuristic. rank stores the height.
update the id of the element that has smaller rank'''
rootId1 = self.findFast(id1)
rootId2 = self.findFast(id2)
if rootId1 == rootId2:
return
if self.rank[rootId1] > self.rank[rootId2]:
self.parent[rootId2] = rootId1
else:
self.parent[rootId1] = rootId2
if self.rank[rootId1] == self.rank[rootId2]:
self.rank[rootId2] += 1
class CreateMaze:
'''uses DisjointSets to quickly find if any two points are reachable'''
def __init__(self, mazeData, totalNumOfPoints):
self.totalNumOfPoints = totalNumOfPoints
self.disjointSets = DisjointSets(self.totalNumOfPoints)
for point in range(totalNumOfPoints):
self.disjointSets.makeSet(point)
for point in range(totalNumOfPoints):
for neighbor in mazeData[point]:
self.disjointSets.union(point, neighbor)
def isReachable(self, point1, point2):
if point1 < 0 or point1 > self.totalNumOfPoints - 1 \
or point2 < 0 or point2 > self.totalNumOfPoints - 1:
raise Exception("Points not in maze")
return self.disjointSets.findFast(point1) == self.disjointSets.findFast(point2)
if __name__ == '__main__':
priorityQueue = BinaryHeap()
priorityQueue.insert(10)
priorityQueue.insert(5)
priorityQueue.insert(30)
priorityQueue.insert(25)
priorityQueue.insert(50)
print("\nMaximum value inserted is:", priorityQueue.extractMax())
print("\nHeap sort a list [3,5,1,2,6,5,4]:", heapSort([3,5,1,2,6,5,4]))
print("\nPartial sort: return top 3 elements from the list [3,5,1,2,6,5,4]:", partialSort([3,5,1,2,6,5,4], 3))
print("\nMaze problem:")
mazeData = [[1], # neighbors of point 0 is 1
[0,2], # neighbors of point 1 is 0,2
[1,3], # neighbors of point 2 is 1,3
[2], # neighbors of point 3 is 2
[]] # neighbors of point 4 is None
maze = CreateMaze(mazeData, 5)
print("Point 0 and 4 are reachable in maze:", maze.isReachable(0, 4))
print("Point 0 and 3 are reachable in maze:",maze.isReachable(0, 3))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment