Last active
April 12, 2022 13:48
-
-
Save ALenfant/6766560 to your computer and use it in GitHub Desktop.
An implementation of the KSPA algorithm (for K-Shortest Paths based on A* found at http://en.cnki.com.cn/Article_en/CJFDTOTAL-JSJY200804041.htm), using the igraph library, in Python 3.
This algorithm has the particularity of being able to find the k shortest paths in a multigraph (a graph, directed or not, with parallel edges).
Implemented accor…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def heuristic_cost_estimate(graph, source, target): | |
#Possible implementation here https://gist.github.com/delijati/1629405 | |
return 0 #Heuristic = 0 : same as Dijkstra | |
def reconstruct_path(came_from, current_node): | |
if current_node in came_from: | |
p = reconstruct_path(came_from, came_from[current_node]) | |
return p + [current_node] | |
else: | |
return [current_node] | |
def Astar(graph, source, target, weights=None): | |
closedset = set() #The set of nodes already evaluated. | |
openset = set([source]) #The set of tentative nodes to be evaluated, initially containing the start node | |
came_from = {} #The map of navigated nodes | |
g_score = {} #Cost from start along the best known path | |
f_score = {} # Estimated total cost from start to goal through y | |
g_score[source] = 0 | |
f_score[source] = g_score[source] + heuristic_cost_estimate(graph, source, target) | |
while len(openset) > 0: #while openset is not empty | |
current = min(openset, key=f_score.get) #the node in openset having the lowest f_score[] value | |
if current == target: | |
return reconstruct_path(came_from, target) | |
openset.remove(current) | |
closedset.add(current) | |
for neighbor_edge in graph.es.select(_source=current): #graph.neighborhood(current, mode=igraph.OUT): | |
neighbor = neighbor_edge.target | |
if weights == None: | |
tentative_g_score = g_score[current] + 1 | |
else: | |
tentative_g_score = g_score[current] + neighbor_edge[weights] #dist_between(current,neighbor) | |
if neighbor in closedset and tentative_g_score >= g_score[neighbor]: | |
continue | |
if neighbor not in openset or tentative_g_score < g_score[neighbor]: | |
came_from[neighbor] = current | |
g_score[neighbor] = tentative_g_score | |
f_score[neighbor] = g_score[neighbor] + heuristic_cost_estimate(graph, neighbor, target) | |
if neighbor not in openset: | |
openset.add(neighbor) #add neighbor to openset | |
return None #Failure to find a path | |
#Tranforms a vertex sequence to the corresponding sequence of edges' "ids" | |
# | |
def VertexListToEdgeList(graph, nodelist, id_attribute, weights=None): | |
edge_list = [] | |
for i in range(len(nodelist)-1): | |
if weights == None: | |
#No weights | |
edge = graph.es.find(_source=nodelist[i], _target=nodelist[i+1]) | |
else: | |
possible_edges = graph.es.select(_source=nodelist[i], _target=nodelist[i+1]) | |
edge = possible_edges[0] | |
if len(possible_edges) > 1: | |
for possible_edge in possible_edges: | |
if possible_edge[weights] < edge[weights]: | |
edge = possible_edge | |
edge_list.append(edge[id_attribute]) | |
return edge_list | |
def contains(small, big): | |
for i in range(len(big)-len(small)+1): | |
for j in range(len(small)): | |
if big[i+j] != small[j]: | |
break | |
else: | |
return i, i+len(small) | |
return False | |
def EdgeListCost(graph, edgelist, weights): | |
if weights == None: | |
#No weights, each edge is worth 1 | |
potential_current_path_cost = len(edgelist) | |
else: | |
potential_current_path_cost = sum([graph.es[edge][weights] for edge in edgelist]) | |
return potential_current_path_cost | |
def KSPA(graph, source, target, num_paths, weights=None): | |
#Initialization: | |
# Copy the graph because we will modify it | |
import copy | |
initial_graph = graph #Kept for cost calculations since sometimes edges are deleted | |
graph = copy.deepcopy(graph) | |
# Give another set of ids to the edges that will not change | |
for edge in graph.es: | |
edge["abs_id"] = edge.index | |
# Copy the graph because we will modify it | |
import copy | |
graph = copy.deepcopy(graph) | |
##Step 1 | |
import queue | |
optional_path = set() #Possible next paths to consider | |
optional_path_queue = queue.PriorityQueue() #Possible next paths to consider, with cost as priority (a get() will return the lowest-cost path inside) | |
optional_path_vertices = {} #Dict with each optional_path as keys containing paths in therms of vertices | |
current_path_vertices = Astar(graph, source, target, weights=weights) #Shortest path | |
if current_path_vertices == []: | |
return [] | |
current_path = VertexListToEdgeList(graph, current_path_vertices, "abs_id", weights=weights) #to a list of edge "ids" | |
current_path = tuple(current_path) #To a tuple (to add in a set if necessary) #Current path to consider | |
result_path = [current_path] #Contains the results as paths of edges (identified by their id in the initial graph) by order of priority | |
result_path_vertices = [current_path_vertices] #Contains the results as path of vertices (identified by their id) (same order) | |
result_path_costs = [] #Contains the cost of each result | |
k = 1 #Number of shortest paths already computed | |
#Main loop | |
while True: | |
edges_to_delete = [] #Contains the edges to delete before trying to find the next shortest path | |
deleted_edges = [] #Contains a list of tuples (source, target, attributes) of the temporarily deleted edges | |
##Step 2 | |
#If we have no more possible path or enough paths | |
if current_path == None or k >= num_paths: | |
break #Leave the main loop | |
##Step 3 | |
#loop for every node until the end node | |
for i in range(len(current_path)): | |
edge_id = current_path[i] | |
node_j = current_path_vertices[i] #graph.es.find(abs_id = edge_id).source | |
path_before_node_j = current_path[:i] #Path until node j | |
path_before_node_j_vertices = current_path_vertices[:i] | |
##Step 4 | |
#For each reachable node... | |
next_edges = [] #Will contain the list of outgoing edges that respect condition R | |
for next_edge in graph.es.select(_source=node_j): | |
#Condition R checks | |
if next_edge.target != node_j and next_edge.target not in path_before_node_j_vertices: | |
next_edges.append(next_edge["abs_id"]) #Condition R respected, we keep this edg | |
else: | |
edge_to_delete = next_edge | |
for next_edge in next_edges: | |
#In order to avoid current_path from adding to optional_path again: | |
checked_path = path_before_node_j + (next_edge,) #New path tuple | |
set_of_paths_to_check = set() | |
set_of_paths_to_check = set_of_paths_to_check.union(result_path) | |
set_of_paths_to_check = set_of_paths_to_check.union(optional_path) | |
for path_to_check in set_of_paths_to_check: | |
if contains(checked_path, path_to_check) != False: | |
edges_to_delete.append(next_edge) | |
edge_to_delete = graph.es.find(abs_id=next_edge) | |
#We'll delete later to avoid id conflicts/edge shuffling | |
break | |
##Step 5 | |
#We will look for the next shortest path, let's delete the edges to delete | |
edges_to_delete = graph.es.select(abs_id_in=edges_to_delete) | |
for edge_to_delete in edges_to_delete: | |
if edge_to_delete.source != node_j: | |
raise Exception("Wrong edge to delete!") #References to edges shifted | |
deleted_edges.append((edge_to_delete.source, edge_to_delete.target, edge_to_delete.attributes())) | |
#edge_to_delete.delete() | |
edges_to_delete.delete() | |
edges_to_delete = [] #TODO placer correctement dans loop | |
if len(graph.es.select(abs_id_in=edges_to_delete)) > 0: | |
raise Exception("Wrong edge deleted!") #References to edges shifted | |
#Then look for the next shortest path | |
new_shortest_path_vertices = Astar(graph, node_j, target, weights=weights) #Shortest path | |
#If there is one | |
if new_shortest_path_vertices != None: | |
#Find the used edge's "ids" | |
new_shortest_path = VertexListToEdgeList(graph, new_shortest_path_vertices, "abs_id", weights=weights) #to a list of edge "ids" | |
new_shortest_path = tuple(new_shortest_path) #To a tuple (to add in a set if necessary) | |
#We concatenate the shortest path between the node j and the end vertex to the left part of the path until j | |
new_optional_path = path_before_node_j + new_shortest_path | |
new_optional_path_vertices = path_before_node_j_vertices + new_shortest_path_vertices | |
new_optional_path_cost = EdgeListCost(initial_graph, new_optional_path, weights) | |
if graph.es.find(abs_id=new_optional_path[-1]).target != target: | |
raise Exception("Wrong selected path") #Safeguard | |
#We add the path to the list of possible ones | |
optional_path.add(new_optional_path) | |
optional_path_vertices[new_optional_path] = new_optional_path_vertices #[graph.es.find(abs_id = edge_id).source for edge_id in new_optional_path] #we convert it to a list of vertex ids | |
optional_path_queue.put_nowait((new_optional_path_cost, new_optional_path)) | |
#Delete all following edges | |
edges_to_delete = graph.es.select(abs_id_in=next_edges) | |
for edge_to_delete in edges_to_delete: | |
if edge_to_delete.source != node_j: | |
raise Exception("Wrong edge to delete!") #References to edges shifted | |
deleted_edges.append((edge_to_delete.source, edge_to_delete.target, edge_to_delete.attributes())) | |
edges_to_delete.delete() | |
edges_to_delete = [] | |
#Next iteration (next node j) | |
#We tried all edges so we are at the end node | |
##Step 7 | |
#We select the path with the minimal cost and consider it as current_path | |
if optional_path == None: | |
raise Exception("Error") #This should NOT happen, optional_path can be empty though | |
break #this will exit the loop since current_path == None | |
try: | |
current_path_cost, current_path = optional_path_queue.get_nowait() | |
except queue.Empty: | |
print("No more possible paths") | |
break | |
if current_path == None: | |
raise Exception("error to manage") | |
current_path_vertices = optional_path_vertices[current_path] #We set the path containing vertices ids for the next iteration | |
#We have the path with the least cost | |
result_path.append(current_path) | |
result_path_vertices.append(current_path_vertices) | |
result_path_costs.append(current_path_cost) | |
#We remove it from optional_path since it has been selected | |
optional_path.remove(current_path) | |
#We've got a new path! | |
k = k + 1 | |
##Step 8? | |
#TODO:Is it needed to add all edges back???? | |
for deleted_edge in deleted_edges: | |
path_source, path_target, path_attributes = deleted_edge | |
graph.add_edge(path_source, path_target) | |
graph.es[len(graph.es) - 1].update_attributes(path_attributes) | |
deleted_edges = [] | |
return result_path, result_path_vertices, result_path_costs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi ALenfant,
I need to determine k-shortest paths in a multigraph, and find your above code to be the right fit to solve my problem.
I am new to complex Python programming, and find it hard as to how to make the code work.
Would you be able to share any input files and high-level instructions on how to execute this code.
I'll really appreciate your help on this.
Best
Neeraj