Skip to content

Instantly share code, notes, and snippets.

@jtorrents
Last active December 11, 2015 10:28
Show Gist options
  • Save jtorrents/4586874 to your computer and use it in GitHub Desktop.
Save jtorrents/4586874 to your computer and use it in GitHub Desktop.
import time
import networkx as nx
from contextlib import contextmanager
@contextmanager
def hidden(G, nodes=None, edges=None):
if nodes is None:
nodes = []
if edges is None:
edges = []
hidden_nodes = [(u,G.node[u]) for u in nodes]
# add edges adjacent to any hidden (to be removed) nodes
extra_edges = G.edges(nodes)
hidden_edges = [(u,v,G[u][v]) for u,v in edges+extra_edges]
try:
G.remove_edges_from(hidden_edges)
# Should remove_nodes_from be modified so we can just use
# G.remove_nodes_from(nodes) ?
G.remove_nodes_from(n for (n,d) in hidden_nodes)
yield G
finally:
G.add_edges_from(hidden_edges)
G.add_nodes_from(hidden_nodes)
def bidirectional_shortest_path(G, source, target, ignore_nodes=None, ignore_edges=None):
"""Return a list of nodes in a shortest path between source and target.
Parameters
----------
G : NetworkX graph
source : node label
starting node for path
target : node label
ending node for path
ignore_nodes : list of nodes
nodes to ignore, optional
ignore_edges : list of edges
edges to ignore, optional
Returns
-------
path: list
List of nodes in a path from source to target.
Raises
------
NetworkXNoPath
If no path exists between source and target.
See Also
--------
shortest_path
Notes
-----
This algorithm is used by shortest_path(G,source,target).
"""
# call helper to do the real work
results=_bidirectional_pred_succ(G,source,target,ignore_nodes,ignore_edges)
pred,succ,w=results
# build path from pred+w+succ
path=[]
# from w to target
while w is not None:
path.append(w)
w=succ[w]
# from source to w
w=pred[path[0]]
while w is not None:
path.insert(0,w)
w=pred[w]
return path
def _bidirectional_pred_succ(G, source, target, ignore_nodes=None, ignore_edges=None):
"""Bidirectional shortest path helper.
Returns (pred,succ,w) where
pred is a dictionary of predecessors from w to the source, and
succ is a dictionary of successors from w to the target.
"""
# does BFS from both source and target and meets in the middle
if target == source:
return ({target:None},{source:None},source)
# handle either directed or undirected
if G.is_directed():
Gpred=G.predecessors_iter
Gsucc=G.successors_iter
else:
Gpred=G.neighbors_iter
Gsucc=G.neighbors_iter
# support optional nodes filter
if ignore_nodes:
def filter_iter(nodes_iter):
def iterate(v):
for w in nodes_iter(v):
if w not in ignore_nodes:
yield w
return iterate
Gpred=filter_iter(Gpred)
Gsucc=filter_iter(Gsucc)
# support optional edges filter
if ignore_edges:
if G.is_directed():
def filter_pred_iter(pred_iter):
def iterate(v):
for w in pred_iter(v):
if (w, v) not in ignore_edges:
yield w
return iterate
def filter_succ_iter(succ_iter):
def iterate(v):
for w in succ_iter(v):
if (v, w) not in ignore_edges:
yield w
return iterate
Gpred=filter_pred_iter(Gpred)
Gsucc=filter_succ_iter(Gsucc)
else:
def filter_iter(nodes_iter):
def iterate(v):
for w in nodes_iter(v):
if (v, w) not in ignore_edges \
and (w, v) not in ignore_edges:
yield w
return iterate
Gpred=filter_iter(Gpred)
Gsucc=filter_iter(Gsucc)
# predecesssor and successors in search
pred={source:None}
succ={target:None}
# initialize fringes, start with forward
forward_fringe=[source]
reverse_fringe=[target]
while forward_fringe and reverse_fringe:
if len(forward_fringe) <= len(reverse_fringe):
this_level=forward_fringe
forward_fringe=[]
for v in this_level:
for w in Gsucc(v):
if w not in pred:
forward_fringe.append(w)
pred[w]=v
if w in succ:
# found path
return pred,succ,w
else:
this_level=reverse_fringe
reverse_fringe=[]
for v in this_level:
for w in Gpred(v):
if w not in succ:
succ[w]=v
reverse_fringe.append(w)
if w in pred:
# found path
return pred,succ,w
raise nx.NetworkXNoPath("No path between %s and %s." % (source, target))
def node_connectivity_filter(G, source, target,strict=False,max_paths=None):
# Maximum possible node independent paths
if G.is_directed():
possible = min(G.out_degree(source), G.in_degree(target))
else:
possible = min(G.degree(source), G.degree(target))
if max_paths is None:
max_paths = float('Inf')
K = 0
if target == source:
return None
elif possible == 0:
return 0
elif strict and target in G[source]:
return float('nan')
exclude = set()
for i in range(min(possible, max_paths)):
try:
path = bidirectional_shortest_path(G,source,target,ignore_nodes=exclude)
exclude.update(set(path)-set([source, target]))
K += 1
except nx.NetworkXNoPath:
break
return K
def node_connectivity_hide(G, source, target,strict=False,max_paths=None):
# Maximum possible node independent paths
if G.is_directed():
possible = min(G.out_degree(source), G.in_degree(target))
else:
possible = min(G.degree(source), G.degree(target))
if max_paths is None:
max_paths = float('Inf')
K = 0
if target == source:
return None
elif possible == 0:
return 0
elif strict and target in G[source]:
return float('nan')
exclude = set()
for i in range(min(possible, max_paths)):
try:
with hidden(G, nodes=exclude) as H:
path = nx.shortest_path(H,source,target)
exclude.update(set(path)-set([source, target]))
K += 1
except nx.NetworkXNoPath:
break
return K
if __name__ == '__main__':
for p in [0.05, 0.1, 0.15, 0.2]:
G = nx.gnp_random_graph(1001,p)
print("Testing filters with a G_np random graph of order {0} and size {1}".format(G.order(), G.size()))
start = time.time()
k = node_connectivity_hide(G, 1, 1000)
print("\tContext class: found {0} node independent paths in {1:.4f} seconds".format(k, time.time()-start))
start = time.time()
k = node_connectivity_filter(G, 1, 1000)
print("\tFilter function: found {0} node independent paths in {1:.4f} seconds".format(k, time.time()-start))
@jtorrents
Copy link
Author

Speed tests for shortest paths with restrictions using White and Newman approximation algorithm for node independent paths

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment