Skip to content

Instantly share code, notes, and snippets.

@270ajay
Created April 2, 2020 09:54
Show Gist options
  • Save 270ajay/23e36ebf48f951853c97367c122a56ca to your computer and use it in GitHub Desktop.
Save 270ajay/23e36ebf48f951853c97367c122a56ca to your computer and use it in GitHub Desktop.
finding fastest route (Dijkstra algorithm, etc.)
import heapq
'''course: https://www.coursera.org/learn/algorithms-on-graphs#about'''
class PriorityQueue():
'''credits: Eugene Yarmash
https://stackoverflow.com/questions/46636656/python-heapq-replace-priority'''
REMOVED = "REMOVED"
def __init__(self):
self.priorityQueue = []
self.nodeInfo = {}
def addNodeWithPriority(self, node, priority):
'''adds or updates the node in priority queue'''
if node in self.nodeInfo:
self.removeNode(node)
entry = [priority, node]
self.nodeInfo[node] = entry
heapq.heappush(self.priorityQueue, entry)
def removeNode(self, node):
removedNode = self.nodeInfo.pop(node)
removedNode[1] = PriorityQueue.REMOVED
def extractMin(self):
while not self.isEmpty():
priority, node = heapq.heappop(self.priorityQueue)
if node != PriorityQueue.REMOVED:
del self.nodeInfo[node]
return node
def isEmpty(self):
if self.priorityQueue:
return False
return True
#############################################################################
class Node:
def __init__(self, key, next=None):
self.key = key
self.next = next
class SinglyLinkedList:
def __init__(self):
self.head = None
self.tail = None
def pushFront(self, key):
node = Node(key=key, next=self.head)
self.head = node
if self.tail == None:
self.tail = self.head
def topFront(self):
return self.head.key
def popFront(self):
if self.head == None:
raise Exception("Empty list")
poppedKey = self.head.key
self.head = self.head.next
if self.head == None:
self.tail = None
return poppedKey
def pushBack(self, key):
node = Node(key=key)
if self.tail == None:
self.head = node
self.tail = node
else:
self.tail.next = node
self.tail = node
def empty(self):
return self.head == None
def __str__(self):
head = self.head
string = "["+str(head.key) + ", "
while head.next.next != None:
head = head.next
string += str(head.key) + ", "
string += str(head.next.key) + "]"
return string
class Queue:
def __init__(self):
self.collection = SinglyLinkedList()
def enqueue(self, key):
self.collection.pushBack(key)
def dequeue(self):
return self.collection.popFront()
def empty(self):
return self.collection.empty()
#############################################################################
def relax(fromNode, toNode, distances, previous, edgeWeights, hasChanged):
if distances[toNode] > distances[fromNode] + edgeWeights[fromNode][toNode]:
distances[toNode] = distances[fromNode] + edgeWeights[fromNode][toNode]
previous[toNode] = fromNode
hasChanged = True
return distances, previous, hasChanged
def naiveAlgorithmToGetFastestRoute(adjacencyList, edgeWeights, origin):
'''Finds the fastest path between origin and other nodes.
Can go into infinite loop if a negative weight cycle is present'''
distances = {}
previous = {}
INFINITY = 10000
for node in adjacencyList:
distances[node] = INFINITY
previous[node] = None
distances[origin] = 0
while True:
hasChanged = False
for fromNode in adjacencyList:
for toNode in adjacencyList[fromNode]:
distances, previous, hasChanged = relax(fromNode, toNode, distances, previous, edgeWeights, hasChanged)
if not hasChanged:
break
return distances, previous
def reconstructShortestPath(origin, toNode, previous):
'''returns a list of nodes between origin and toNode
such that it is the shortest path'''
result = []
while toNode != origin:
result.append(toNode)
toNode = previous[toNode]
result.append(origin)
result.reverse()
return result
def dijkstra(adjacencyList, edgeWeights, origin):
'''Finds the shortest path between origin and other nodes. Faster than naive approach.
Edges should have non-negative weights'''
distances = {}
previous = {}
INFINITY = 10000
for node in adjacencyList:
distances[node] = INFINITY
previous[node] = None
distances[origin] = 0
priorityQueue = PriorityQueue()
for node in distances:
priorityQueue.addNodeWithPriority(node, distances[node])
while not priorityQueue.isEmpty():
fromNode = priorityQueue.extractMin()
if fromNode == None:
break
for toNode in adjacencyList[fromNode]:
if distances[toNode] > distances[fromNode] + edgeWeights[fromNode][toNode]:
distances[toNode] = distances[fromNode] + edgeWeights[fromNode][toNode]
previous[toNode] = fromNode
priorityQueue.addNodeWithPriority(toNode, distances[toNode])
return distances, previous
def relaxBellmanFord(fromNode, toNode, distances, previous, edgeWeights, nodesRelaxed, hasChanged):
if distances[toNode] > distances[fromNode] + edgeWeights[fromNode][toNode]:
distances[toNode] = distances[fromNode] + edgeWeights[fromNode][toNode]
previous[toNode] = fromNode
hasChanged = True
if nodesRelaxed != None:
nodesRelaxed.add(fromNode)
nodesRelaxed.add(toNode)
return distances, previous, hasChanged, nodesRelaxed
def getInfiniteArbitrageNodes(nodesRelaxed, adjacencyList):
'''We can minimize the distance of the path going through these nodes infinitely'''
parent = {}
infiniteArbitrageNodes = set()
infiniteArbitrageNodes.update(nodesRelaxed)
queue = Queue()
for node in nodesRelaxed:
queue.enqueue(node)
while not queue.empty():
fromNode = queue.dequeue()
for toNode in adjacencyList[fromNode]:
if toNode in parent:
continue
infiniteArbitrageNodes.add(toNode)
queue.enqueue(toNode)
parent[toNode] = fromNode
return infiniteArbitrageNodes, parent
def bellmanFord(adjacencyList, edgeWeights, origin):
'''Gives correct distances from origin to other nodes if no negative weight cycles in graph
Works with edges with negative weights
Very similar to naive approach'''
distances = {}
previous = {}
INFINITY = 10000
for node in adjacencyList:
distances[node] = INFINITY
previous[node] = None
distances[origin] = 0
nodesRelaxed = None
numOfNodes = len(adjacencyList)
for i in range(numOfNodes):
if i == numOfNodes - 1:
nodesRelaxed = set()
hasChanged = False
for fromNode in adjacencyList:
for toNode in adjacencyList[fromNode]:
distances, previous, hasChanged, nodesRelaxed = relaxBellmanFord(fromNode, toNode, distances, previous, edgeWeights, nodesRelaxed, hasChanged)
if not hasChanged:
break
if nodesRelaxed:
infiniteArbitrageNodes, parent = getInfiniteArbitrageNodes(nodesRelaxed, adjacencyList)
print('\nInfinite arbitrage nodes:', infiniteArbitrageNodes)
print('Parents of infinite arbitrage nodes:', parent)
return distances, previous
if __name__ == '__main__':
adjacencyList = {'A': {'B', 'C'},
'B': {'A', 'C'},
'C': {'A', 'D'},
'D': {'C'},
'E': {'F'},
'F': {'E'}}
edgeWeights = {'A': {'B': 8, 'C': 10},
'B': {'A': 5, 'C': 1},
'C': {'A': 12, 'D': 14},
'D': {'C': 12},
'E': {'F': 10},
'F': {'E': 12}}
distances, previous = naiveAlgorithmToGetFastestRoute(adjacencyList, edgeWeights, 'A')
print('\nFastest routes from A to other nodes:', distances)
print("Fastest route between A to D is:", reconstructShortestPath('A', 'D', previous))
distancesDijkstra, previousDijkstra = dijkstra(adjacencyList, edgeWeights, 'A')
print('\nDijkstra: Fastest routes from A to other nodes:', distancesDijkstra)
print("Dijkstra: Fastest route between A to D is:", reconstructShortestPath('A', 'D', previousDijkstra))
currencyAdjacencyList = {'RUR': {'EUR'},
'USD': {'EUR'},
'EUR': {'GBP'},
'GBP': {'NOK'},
'NOK': {'EUR'}}
edgeWeightsInNegLog = {'RUR': {'EUR': 1.88},
'USD': {'EUR': -0.06},
'EUR': {'GBP': 0.09},
'GBP': {'NOK': -1.09},
'NOK': {'EUR': 0.95}}
distancesBellman, previousBellman = bellmanFord(currencyAdjacencyList, edgeWeightsInNegLog, 'RUR')
print('Bellman: cheapest currency exchange from RUR to other currencies:', distancesBellman)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment