Skip to content

Instantly share code, notes, and snippets.

@Adrien-Luxey
Created April 6, 2021 14:35
Show Gist options
  • Save Adrien-Luxey/d8f77104056ce5f208a6933467e805f6 to your computer and use it in GitHub Desktop.
Save Adrien-Luxey/d8f77104056ce5f208a6933467e805f6 to your computer and use it in GitHub Desktop.
WIP of a graph layered layout implementation for NetworkX, following Sugiyama's method.
"""Implementation of the layered layout for DAGs.
By Adrien Luxey, April 2021.
Licence: public domain
TODO:
* Handling disconnected components
* Layer width minimization
* Handle scale and center parameters
* Cycle removal (will allow non-DAG DiGraphs to be layout)
* Accept weighted edges
* Accept MultiDiGraphs (& increase weight of multi-edges)
"""
import networkx as nx
import copy
import collections
import numpy as np
DUMMY_KEY = "_dummy"
def layered_layout(G, dummies=False, align='vertical', scale=1, center=None):
"""Position nodes in layers of straight lines.
Parameters
----------
G : NetworkX directed graph (nx.DiGraph)
A Directed **Acyclic** Graph.
dummies : bool
Should we add dommy nodes positions to returned pos dict?
align : string (default='horizontal')
The alignment of nodes: 'vertical' or 'horizontal'
scale : number (defaul= 1)
Scale factor for positions.
center : array-like or None
Coordinate pair around which to center the layout.
Returns
-------
G : nx.DiGraph
Input G with added dummy nodes.
pos : dict
A dictionary of positions keyed by node.
"""
if not nx.is_directed_acyclic_graph(G):
raise nx.NetworkXNotImplemented(
'layered_layout is only implemented for directed acyclic graphs')
# Algo steps:
# * Cycle removal: Not implemented; to do calling layered_layout
# * Layer Assignment: Vertices are assigned to layers
# * Vertex Ordering: Dummy vertices and edges are added to the graph
# so that every edge connects only vertices from adjacent layers.
# Vertices in a layer are reordered so that edge crossings are minimised
# * Coordinate Assignment: Assign coordinates to vertices
# We will edit G, but do not want to modify the caller's instance
# So we copy G in place
G = G.copy()
# Layer Assignment
nodes_layer = _layer_assignment(G)
# Vertex Ordering
G, layers_order = _vertex_ordering(G, nodes_layer)
# Coordinate Assignment
return G, _coordinate_assignmnent(G, layers_order, dummies, align, scale, center)
def _layer_assignment(G):
"""Assigns each node in G to a layer number, based on the node's ancestors
layer(u) = 1 + max{layer(v) for v, _ in in_edges(u)}
Based off:
* p. 14: https://i11www.iti.kit.edu/_media/teaching/winter2016/graphvis/graphvis-ws16-v7-added-proofs.pdf
* https://networkx.org/documentation/stable/_modules/networkx/algorithms/dag.html#topological_sort
Parameters
----------
G : networkx.DiGraph
A Directed **Acyclic** Graph.
Returns
-------
dict[node]int
Layer number for each node in G
"""
indegree_map = {v: d for v, d in G.in_degree() if d > 0}
# These nodes have zero indegree and ready to be returned.
zero_indegree = [v for v, d in G.in_degree() if d == 0]
# Compute each node's layer
nodes_layer = {}
while zero_indegree:
u = zero_indegree.pop()
# Process u's direct successors
for _, v in G.out_edges(u):
indegree_map[v] -= 1
if indegree_map[v] == 0:
zero_indegree.append(v)
del indegree_map[v]
# Compute u's layer
if G.in_degree(u) == 0:
nodes_layer[u] = 0
else:
nodes_layer[u] = 1 + max(
[nodes_layer[v] for v, _ in G.in_edges(u)])
return nodes_layer
def _split_long_edges(G, nodes_layer):
"""Add dummy vertices such that each edge spans only one layer.
Iteratively checks G's edges,
and adds one dummy node per layer the edge crosses.
The dummy nodes are added to G and the nodes_layer dict.
"""
dummy_id = 0
for e in list(G.edges):
from_pos = nodes_layer[e[0]]
to_pos = nodes_layer[e[1]]
e_length = to_pos - from_pos
if e_length == 1:
continue
# Remove existing edge from G
G.remove_edge(*e)
# Iteratively add one dummy node per layer from e[0] to e[1]
dummy_node, prev_dummy_node = None, None
for i in range(e_length-1):
dummy_node = f"dummy_{dummy_id}"
# Add dummy node to G
G.add_node(dummy_node, **{DUMMY_KEY: True})
# Add dummy node layer's to nodes_layer
nodes_layer[dummy_node] = from_pos + i + 1
# Create path from previous node to dummy node
if i == 0:
G.add_edge(e[0], dummy_node)
else:
G.add_edge(prev_dummy_node, dummy_node)
# Iteration stuff
prev_dummy_node = dummy_node
dummy_id += 1
# Create edge from last dummy node to successor node
G.add_edge(dummy_node, e[1])
return G, nodes_layer
def _nodes_layer_dict_to_layers_order(nodes_layer):
n_layers = max(nodes_layer.values()) + 1
layers = [None] * n_layers
for u, l in nodes_layer.items():
if layers[l] is None:
layers[l] = [u]
else:
layers[l].append(u)
return layers
def _order_by_weighted_median(G, layers_order, top_to_bot):
def _weighted_node_median(node_neighbours, layer):
# Sorted ranks of the ancestors that are in the previous layer
neighbours_rank = sorted([
r for r, u in enumerate(layer) if u in node_neighbours
])
n_neighbours = len(neighbours_rank)
if n_neighbours == 0:
# When a node has no direct ancestors,
# it must keep its current position.
# median == -1 informs _sort_layer of the situation
return -1.
elif n_neighbours == 1:
return neighbours_rank[0]
elif n_neighbours == 2:
return sum(neighbours_rank) / 2
midpoint = n_neighbours // 2
if n_neighbours % 2 == 1:
return neighbours_rank[midpoint]
# n_neighbours > 2 and even
left_range = neighbours_rank[midpoint-1] - neighbours_rank[0]
right_range = neighbours_rank[-1] - neighbours_rank[midpoint]
return (neighbours_rank[midpoint-1] * left_range
+ neighbours_rank[midpoint] * right_range) \
/ (left_range + right_range)
def _sort_layer(layer_order, nodes_median):
# TODO: optimizable
# OrderedDict remembers order of insertion
# => correct inserts in layer_order afterwards
fixed_nodes = collections.OrderedDict()
list_of_tuples = []
for (pos, u), m in zip(enumerate(layer_order), nodes_median):
if m == -1:
fixed_nodes[u] = pos
else:
list_of_tuples.append((u, m))
list_of_tuples = sorted(list_of_tuples, key=lambda t: t[1])
layer_order = [t[0] for t in list_of_tuples]
for u, pos in fixed_nodes.items():
layer_order.insert(pos, u)
return layer_order
if top_to_bot:
layers_iterator = range(len(layers_order)-2, 0, -1)
def node_neighbours(u): return [v for _, v in G.out_edges(u)]
def next_layer_id(l): return l+1
else:
layers_iterator = range(1, len(layers_order))
def node_neighbours(u): return [v for v, _ in G.in_edges(u)]
def next_layer_id(l): return l-1
for l in layers_iterator:
nodes_median = [0] * len(layers_order[l])
for i, u in enumerate(layers_order[l]):
nodes_median[i] = _weighted_node_median(
node_neighbours(u),
layers_order[next_layer_id(l)])
layers_order[l] = _sort_layer(layers_order[l], nodes_median)
return layers_order
def _crossing_local(G, layers_order, u, v, l, top_to_bot):
# We assume that u's rank is lower than v's
# Discard first/last layer depending on search direction
if top_to_bot and l == len(layers_order) - 1 or not top_to_bot and l == 0:
return 0
if top_to_bot:
layer = layers_order[l+1]
def node_neighbours(u): return [v for _, v in G.out_edges(u)]
else:
layer = layers_order[l-1]
def node_neighbours(u): return [v for v, _ in G.in_edges(u)]
# Fetching rank of u (resp. v)'s neighbours in studied layer
u_neighbours, v_neighbours = node_neighbours(u), node_neighbours(v)
u_neigh_ranks, v_neigh_ranks = [], []
for r, w in enumerate(layer):
if w in u_neighbours:
u_neigh_ranks.append(r)
elif w in v_neighbours:
v_neigh_ranks.append(r)
# Computing number of edge crossings
uv_edge_crossings = 0
for u_neigh_rank in u_neigh_ranks:
for v_neigh_rank in v_neigh_ranks:
uv_edge_crossings += int(u_neigh_rank > v_neigh_rank)
return uv_edge_crossings
def _crossing(G, layers_order):
top_to_bot = True
crossings = 0
for l in range(len(layers_order)-1):
for i, u in enumerate(layers_order[l]):
for j in range(i+1, len(layers_order[l])):
v = layers_order[l][j]
crossings += _crossing_local(
G, layers_order, u, v, l, top_to_bot)
return crossings
def _transpose(G, layers_order, top_to_bot):
improved = True
range_direction = 1 if top_to_bot else -1
while improved:
improved = False
for l in range(0, len(layers_order), range_direction):
for i, u in enumerate(layers_order[l][:-1]):
v = layers_order[l][i+1]
if (_crossing_local(G, layers_order, u, v, l, top_to_bot) >
_crossing_local(G, layers_order, u, v, l, top_to_bot)):
improved = True
layers_order[l][i], layers_order[l][i+1] = v, u
return layers_order
def _vertex_ordering(G, nodes_layer):
"""Orders vertices in each layer to minimise edge crossings.
Based off:
* Gansner, Koutsofios, North & Vo,
"A technique for drawing directed graphs," pp. 13-17,
IIEEE Trans. Software Eng., 1993, doi: 10.1109/32.221135.
* pp. 8-11: https://i11www.iti.kit.edu/_media/teaching/winter2016/graphvis/graphvis-ws16-v8.pdf
Parameters
----------
G : networkx.DiGraph
A Directed **Acyclic** Graph.
nodes_layer : dict[node]int
Layer number for each node in G.
Returns
-------
networkx.DiGraph
Input G with dummy nodes added.
layers_order: list[][]node
ordered list of nodes in each layer
"""
G, nodes_layer = _split_long_edges(G, nodes_layer)
layers_order = _nodes_layer_dict_to_layers_order(nodes_layer)
best_layers_order = copy.copy(layers_order)
MAX_ITERATIONS = 24
for it in range(MAX_ITERATIONS):
# Depending on the parity of the current iteration number,
# the ranks are traversed from top to bottom or from bottom to top
if it % 2 == 1:
top_to_bot = True
else:
top_to_bot = False
layers_order = _order_by_weighted_median(G, layers_order, top_to_bot)
layers_order = _transpose(G, layers_order, top_to_bot)
if _crossing(G, layers_order) \
< _crossing(G, best_layers_order):
best_layers_order = layers_order
return G, best_layers_order
def _priority_heuristic(G, layers_order):
def node_priority(G, u, direction):
if G.nodes[u].get(DUMMY_KEY):
return np.inf
if direction == 1:
return G.out_degree(u)
else:
return G.in_degree(u)
def index_in_list_of_tuples(l, t1=None, t2=None):
"""Index of first item in l = [(a, b)] having a == t1 or b == t2
Return -1 if not found."""
for i, (tt1, tt2) in enumerate(l):
if t1 is not None and tt1 == t1:
return i
if t2 is not None and tt2 == t2:
return i
return -1
def neighbours_pos(G, u, direction, prev_layer_pos):
if direction == 1:
neighbours = [v for _, v in G.out_edges(u)]
else:
neighbours = [v for v, _ in G.in_edges(u)]
neighbours_ids = [index_in_list_of_tuples(prev_layer_pos, t2=v)
for v in neighbours]
return [prev_layer_pos[i][0] for i in neighbours_ids if i != -1]
def move_node(G, p, u, target, layer_priorities,
layer_pos, direction='both'):
"""
direction = ['both', 'prev' or 'next']
layer_priorities is sorted desc.
layer_pos is sorted asc.
"""
u_id = index_in_list_of_tuples(layer_pos, t2=u)
prev_node_pos = layer_pos[u_id - 1] if u_id != 0 else None
next_node_pos = layer_pos[u_id +
1] if u_id != len(layer_pos) - 1 else None
if ((direction == 'next' or
(prev_node_pos is None or prev_node_pos[0] < target)) and
(direction == 'prev' or
(next_node_pos is None or next_node_pos[0] > target))):
layer_pos[u_id] = (target, u)
return layer_pos
if (direction != 'next' and prev_node_pos is not None
and prev_node_pos[0] >= target):
prev_node = prev_node_pos[1]
prev_node_priority_id = index_in_list_of_tuples(
layer_priorities, t2=prev_node)
assert prev_node_priority_id != -1
prev_node_priority = layer_priorities[prev_node_priority_id][0]
if p > prev_node_priority:
try:
layer_pos = move_node(G, p, prev_node, target-1,
layer_priorities, layer_pos, 'prev')
except RuntimeError:
pass
else:
layer_pos[u_id] = (target, u)
return layer_pos
if (direction != 'prev' and next_node_pos is not None
and next_node_pos[0] <= target):
next_node = next_node_pos[1]
next_node_priority_id = index_in_list_of_tuples(
layer_priorities, t2=next_node)
assert next_node_priority_id != -1
next_node_priority = layer_priorities[next_node_priority_id][0]
if p > next_node_priority:
try:
layer_pos = move_node(G, p, next_node, target+1,
layer_priorities, layer_pos, 'next')
except RuntimeError:
pass
else:
layer_pos[u_id] = (target, u)
return layer_pos
raise RuntimeError(
f'Node {u} (p={p}) could not move to position {target}.')
# pos: list[layer_id]list[node_id](node_pos_in_layer, node)
# pos <- init_order()
# = place each layer L_i' vertices from 0 to |L_i| - 1 in order
n_layers = len(layers_order)
layers_pos = [None] * n_layers
for l, layer_order in enumerate(layers_order):
layers_pos[l] = [None] * len(layer_order)
for i, u in enumerate(layer_order):
layers_pos[l][i] = (i, u)
# Loop:
# downwards: L_1 to L_n
# upwards: L_(n-1) to L_0
# downwards: L_1 to L_n
r = (list(range(1, n_layers))
+ list(range(n_layers - 2, -1, -1))
+ list(range(1, n_layers)))
directions = ([-1] * (n_layers - 1)
+ [1] * (n_layers - 1)
+ [-1] * (n_layers - 1))
for l, direction in zip(r, directions):
# L_j = L_(i-1) if direction = downwards
# L_(i+1) if direction = upwards
prev_layer_pos = layers_pos[l + direction]
# For each vertex u in L_i: compute u's priority:
# * numpy.inf if u is dummy edge
# * otherwise: u's in degree if direction = downwards
# u's out degree if direction = upwards
layer_priorities = sorted(
[(node_priority(G, u, direction), u)
for _, u in layers_pos[l]],
reverse=True)
# for u in L_i in order of descending priority:
for p, u in layer_priorities:
# place u at median(u's L_j neighbours' positions),
# possibly moving other nodes with lower priority.
neigh_pos = neighbours_pos(G, u, direction, prev_layer_pos)
if len(neigh_pos) == 0:
continue
target_pos = int(round(np.mean(neigh_pos)))
# Skip if already at desired pos
u_id = index_in_list_of_tuples(layers_pos[l], t2=u)
prev_pos = layers_pos[l][u_id][0]
if target_pos == prev_pos:
continue
try:
layers_pos[l] = move_node(
G, p, u, target_pos, layer_priorities, layers_pos[l])
except RuntimeError:
pass
return layers_pos
def _coordinate_assignmnent(G, layers_order, dummies, align, scale, center):
"""Finds aesthetical coordinates respecting the previous constraints.
"""
if align != 'horizontal' and align != 'vertical':
raise ValueError("align must be either 'vertical' or 'horizontal'")
layers_pos = _priority_heuristic(G, layers_order)
# layers_pos: list[layer_id]list[node_id](node_pos_in_layer, node_name)
# to
# pos: dict[node_name](xpos, ypos)
pos = {}
n_layers = len(layers_pos)
for d1 in range(n_layers):
for (d2, u) in layers_pos[d1]:
# Skip dummy vertices
if not dummies and G.nodes[u].get(DUMMY_KEY, False):
continue
if align == 'vertical':
pos[u] = (d2, n_layers - d1 - 1)
elif align == 'horizontal':
pos[u] = (d1, d2)
return pos
@Adrien-Luxey
Copy link
Author

Adrien-Luxey commented Apr 6, 2021

Example using following graph:

G_example = nx.DiGraph()
G_example.add_edges_from([
    (1, 7), (7, 10), (1, 4), (4, 7), (7, 9), (2, 4), (2, 5), (5, 7), (7, 8), (5, 8), (8, 9),
    (2, 6), (6, 8), (3, 5), (3, 6), (3, 9), (9, 10)
])

We display NetworkX's spring and mutipartite layouts for comparison. Note that using the multipartite layout, one has to manually annotate each node with their target layer, that is:

G_example_mult.nodes[1]['layer'] = 0
G_example_mult.nodes[2]['layer'] = 0
G_example_mult.nodes[3]['layer'] = 0
G_example_mult.nodes[4]['layer'] = 1
G_example_mult.nodes[5]['layer'] = 1
G_example_mult.nodes[6]['layer'] = 1
G_example_mult.nodes[7]['layer'] = 2
G_example_mult.nodes[8]['layer'] = 3
G_example_mult.nodes[9]['layer'] = 4
G_example_mult.nodes[10]['layer'] = 5

The whole idea of the layered layout is to do this automatically, and beautifully :)

example_combined

It does look like a spaceship.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment