Skip to content

Instantly share code, notes, and snippets.

@Lincoln-LM
Last active March 9, 2025 22:13
Show Gist options
  • Save Lincoln-LM/d8985c2074861adc4f27357dfcbe21ae to your computer and use it in GitHub Desktop.
Save Lincoln-LM/d8985c2074861adc4f27357dfcbe21ae to your computer and use it in GitHub Desktop.
Exact All Portals solver via formulating the problem as the ring star problem and solving that with a MILP solver.
"""Exact All Portals solver via formulating the problem as the ring star problem and solving that
with a MILP solver.
AP paths and this solver differ slightly from ring star as they require only a path and not a cycle.
AP paths additionally can make use of the world spawn point (origin) which technically makes most
edge weights asymmetrical.
The cycle issue is resolved by using a dummy root as the root that has a cost of 0 but must always
connect to the real root.
The edge from the last real node to the dummy root is then just not drawn.
To solve the origin issue the assertion is made that in an optimal solution the origin will only be
used to connect to the first ring of strongholds.
The first ring of strongholds are guaranteed to be closer to the origin than to any other
stronghold.
This means that the fastest path to a first ring stronghold is always via the origin.
It also means that the first ring of strongholds will never be assigned to another node because a
path that assigns them to any other node will always be worse than an identical path that returns
to the origin at the end and picks up the first ring of strongholds then.
This means that an optimal solution will always access the first ring via the origin which will cost
a constant value.
Since this constant amount does not impact the comparison between solutions it can be treated as 0.
Because the only time the origin is used is to access the first ring, dummy nodes can be created for
each first ring stronghold that have a cost of 0 but must always connect to the real stronghold.
"""
import threading
import time
import itertools
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import networkx as nx
import numpy as np
import pulp
seed = np.random.randint(2**30)
print(f"{seed=:08X}")
np.random.seed(seed)
STRONGHOLD_DATA = (
(3, 1280, 2816),
(6, 4352, 5888),
(10, 7424, 8960),
(15, 10496, 12032),
(21, 13568, 15104),
(28, 16640, 18176),
(36, 19712, 21248),
(10, 22784, 24320),
)
# how much to scale coordinates down by
SCALE_FACTOR = 16
# verbose MILP solver output
VERBOSE = False
# simulate AP measurement and prediction of strongholds
sh_rings = []
for count, minimum, maximum in STRONGHOLD_DATA:
first_angle = np.random.rand() * 2 * np.pi
ring = []
for sh in range(count):
angle = first_angle + 2 * np.pi * sh / count
# scale down to make things easier
distance = (maximum + minimum) / 2 / SCALE_FACTOR
x = np.cos(angle) * distance
y = np.sin(angle) * distance
ring.append((x, y))
sh_rings.append(ring)
# select the closest stronghold from the next ring to measure
measured_shs = [sh_rings[0].pop(0)]
for ring in sh_rings[1:]:
closest = 1 << 60, None
for sh in ring:
distance_squared = (measured_shs[-1][0] - sh[0]) ** 2 + (
measured_shs[-1][1] - sh[1]
) ** 2
if distance_squared < closest[0]:
closest = distance_squared, sh
ring.remove(closest[1])
measured_shs.append(closest[1])
# start from the 7th ring measured sh
all_shs = sum(sh_rings, [measured_shs.pop(-2)])
# useful constant indexes
# dummy nodes
DUMMY_ROOT = 0
DUMMY_RING_1_1 = DUMMY_ROOT + 1
DUMMY_RING_1_2 = DUMMY_RING_1_1 + 1
# 7th ring measured stronghold
REAL_ROOT = DUMMY_RING_1_2 + 1
# first ring strongholds
REAL_RING_1_1 = REAL_ROOT + 1
REAL_RING_1_2 = REAL_RING_1_1 + 1
graph = nx.Graph()
for idx, sh in enumerate(all_shs, start=REAL_ROOT):
graph.add_node(
idx, pos=np.array(sh), color="red" if idx == REAL_ROOT else "#1f78b4"
)
NODE_COUNT = len(graph.nodes) + REAL_ROOT
POINTS_IDX = list(range(NODE_COUNT))
ALL_ROUTES = [(p1, p2) for p1 in POINTS_IDX for p2 in POINTS_IDX if p1 != p2]
for i, measured_sh in enumerate(measured_shs, start=1):
graph.add_node(-i, pos=np.array(measured_sh), color="red")
class ComputeThread(threading.Thread):
"""Thread that actually runs the MILP solver and updates the graph"""
def __init__(self, graph: nx.Graph):
super().__init__(daemon=True)
self.graph = graph
self.elapsed_time = 0
self.iteration = -1
self.problem = pulp.LpProblem("RingStarProblem", pulp.LpMinimize)
# define binary variables x_i_j that equal 1
# if and only if the edge i <-> j is part of the cycle
self.cycle_vars = {
point1: {
point2: pulp.LpVariable(f"x_{point1}_{point2}", cat=pulp.LpBinary)
for point2 in POINTS_IDX[point1 + 1 :]
}
for point1 in POINTS_IDX
}
# define binary variables y_i_j that equal 1
# if and only if the node v_i is assigned to the node v_j as an arm
# for only nodes v_i on the cycle y_i_i == 1 i.e. the node is assigned to itself
self.assignment_vars = {
point1: {
point2: pulp.LpVariable(f"y_{point1}_{point2}", cat=pulp.LpBinary)
for point2 in POINTS_IDX
}
for point1 in POINTS_IDX
}
# enforce that each node has exactly 2 cycle connections
# if and only if that node is on the cycle
# and otherwise has 0 cycle connections
for point1 in POINTS_IDX:
self.problem += (
pulp.lpSum(
self.cycle_vars[min(point1, point2)][max(point1, point2)]
for point2 in POINTS_IDX
if point2 != point1
)
== 2 * self.assignment_vars[point1][point1]
)
# enforce that every real node is assigned to exactly 1 other node
# this means nodes on the cycle cannot be assigned to any other nodes
# (they are assigned to themselves)
# and that nodes external to the cycle are only assigned to 1 other node
for point1 in POINTS_IDX[REAL_ROOT:]:
self.problem += (
pulp.lpSum(
self.assignment_vars[point1][point2] for point2 in POINTS_IDX
)
== 1
)
# enforce that the first node and the dummy nodes are only assigned to themselves
# (root is always on the cycle)
self.problem += self.assignment_vars[DUMMY_ROOT][DUMMY_ROOT] == 1
self.problem += self.assignment_vars[DUMMY_RING_1_1][DUMMY_RING_1_1] == 1
self.problem += self.assignment_vars[DUMMY_RING_1_2][DUMMY_RING_1_2] == 1
# dummy nodes & the root are implicitly never going to be assigned to other nodes
# but this can be an added condition here if it helps solving
# enforce that nothing is ever assigned to the dummy nodes
for point in POINTS_IDX[REAL_ROOT:]:
for dummy_point in (DUMMY_ROOT, DUMMY_RING_1_1, DUMMY_RING_1_2):
self.problem += self.assignment_vars[point][dummy_point] == 0
# enforce that the first ring nodes cannot connect to each other
self.problem += self.cycle_vars[REAL_RING_1_1][REAL_RING_1_2] == 0
self.problem += self.cycle_vars[DUMMY_RING_1_1][REAL_RING_1_2] == 0
self.problem += self.cycle_vars[DUMMY_RING_1_2][REAL_RING_1_1] == 0
# enforce that the fake nodes always connect to their real counterpart
self.problem += self.cycle_vars[DUMMY_ROOT][REAL_ROOT] == 1
self.problem += self.cycle_vars[DUMMY_RING_1_1][REAL_RING_1_1] == 1
self.problem += self.cycle_vars[DUMMY_RING_1_2][REAL_RING_1_2] == 1
# define the objective function
# minimize the sum of the distances between all edges on the cycle (cycle length)
# and the sum of the distances between all edges assigned to those on the cycle
# from their assigned cycle node (total arm length)
self.problem += pulp.lpSum(
self.euclidean_distance(point1, point2) * self.cycle_vars[point1][point2]
for point1 in POINTS_IDX
for point2 in POINTS_IDX[point1 + 1 :]
) + pulp.lpSum(
self.euclidean_distance(point1, point2)
* self.assignment_vars[point1][point2]
for point1 in POINTS_IDX
for point2 in POINTS_IDX
if point2 != point1
)
self.running = True
def delta_edges(self, set_of_points):
"""Defines the set of all undirected edges between all points within the set
and all points outside of the set"""
for point1 in set_of_points:
for point2 in POINTS_IDX:
if point2 not in set_of_points:
yield point1, point2
def euclidean_distance(self, point1_index, point2_index):
"""Euclidean distance between two points"""
# dummy points are always free
if point1_index < REAL_ROOT or point2_index < REAL_ROOT:
return 0
return np.sqrt(
np.sum(
(
self.graph.nodes[point1_index]["pos"]
- self.graph.nodes[point2_index]["pos"]
)
** 2
)
)
def run(self):
start_time = time.time()
last_solution = None
while self.running:
self.problem.solve(pulp.PULP_CBC_CMD(msg=VERBOSE))
self.graph.clear_edges()
cycle_edges = {}
assignments = {}
for point1 in POINTS_IDX:
for point2 in POINTS_IDX[point1 + 1 :]:
if self.cycle_vars[point1][point2].value() == 1:
cycle_edges[point1] = cycle_edges.get(point1, []) + [point2]
cycle_edges[point2] = cycle_edges.get(point2, []) + [point1]
p1 = point1
p2 = point2
# if it is real draw it thick
# if it is not a real edge then draw it lightly
thickness = 5
if p1 == DUMMY_ROOT:
p1 = REAL_ROOT
thickness = 0.5
elif p1 == DUMMY_RING_1_1:
p1 = REAL_RING_1_1
thickness = 0.5
elif p1 == DUMMY_RING_1_2:
p1 = REAL_RING_1_2
thickness = 0.5
if p2 == DUMMY_ROOT:
p2 = REAL_ROOT
thickness = 0.5
elif p2 == DUMMY_RING_1_1:
p2 = REAL_RING_1_1
thickness = 0.5
elif p2 == DUMMY_RING_1_2:
p2 = REAL_RING_1_2
thickness = 0.5
if p1 != p2:
self.graph.add_edge(p1, p2, thickness=thickness)
for point1 in POINTS_IDX:
for point2 in POINTS_IDX:
if self.assignment_vars[point1][point2].value() == 1:
if point1 != point2:
self.graph.add_edge(point1, point2)
assignments[point1] = point2
# find all subcycles including the nodes assigned to them
unvisited_nodes = set(POINTS_IDX)
cycles = []
while cycle_edges:
start = list(cycle_edges.keys())[0]
cycle = [start]
cycle_assignments = []
while True:
unvisited_nodes.discard(cycle[-1])
for assigned_node, assigned_to_node in assignments.items():
if assigned_node == assigned_to_node:
continue
if assigned_to_node == cycle[-1]:
unvisited_nodes.discard(assigned_node)
cycle_assignments.append(assigned_node)
connected_edges = cycle_edges.pop(cycle[-1])
next_point = next(
(p for p in connected_edges if p not in cycle), start
)
cycle.append(next_point)
if next_point == start:
break
cycles.append((cycle, cycle_assignments))
# nodes that arent assigned to a cycle node can be considered on their own
for node in unvisited_nodes:
cycles.append(([], [node, assignments[node]]))
# validate directionality of dummy & real nodes
invalid_paths = []
all_paths_valid = True
for cycle_, _ in cycles:
for start, end in (
((REAL_RING_1_1, DUMMY_RING_1_1), (DUMMY_RING_1_2, REAL_RING_1_2)),
((REAL_RING_1_1, DUMMY_RING_1_1), (DUMMY_ROOT, REAL_ROOT)),
((REAL_RING_1_2, DUMMY_RING_1_2), (DUMMY_ROOT, REAL_ROOT)),
):
if start[0] not in cycle_ or end[0] not in cycle_:
continue
path_is_valid = False
cycle = itertools.cycle(cycle_)
invalid_path = [start[0]]
for point in cycle:
if point == start[0]:
break
search_point = end[0]
invalid_point = end[1]
# dummy and real nodes must alternate throughout the cycle
# D1->R1->...->D2->R2 is valid
# D1->R1->...->R2->D2 is invalid
# R1->D1->...->D2->R2 is invalid
# R1->D1->...->R2->D2 is valid
for point in cycle:
invalid_path.append(point)
if point == start[1]:
invalid_path = [start[1]]
search_point = end[1]
invalid_point = end[0]
elif point == invalid_point:
path_is_valid = False
break
elif point == search_point:
path_is_valid = True
break
if not path_is_valid:
all_paths_valid = False
invalid_paths.append(invalid_path)
# lazily apply invalidation constraint
# TODO: is this constraint optimal/always correct
# it is essentially trying to invalidate the specific group of nodes from
# forming a contiguous path by requiring > 2 delta edges
self.problem += (
pulp.lpSum(
self.cycle_vars[min(point1, point2)][
max(point1, point2)
]
for point1, point2 in self.delta_edges(
set(invalid_path)
)
)
>= 3
)
# if there is only one cycle & it is valid then an optimal solution has been found
# and we can stop
if len(cycles) == 1:
print("Found single cycle solution")
if all_paths_valid:
print("Found valid single cycle solution")
self.running = False
else:
print(f"Invalid due to {invalid_paths=}")
# lazily apply the subcycle elimination constraints on every subcycle found
for cycle, assignments in cycles:
if DUMMY_ROOT in cycle:
continue
subcycle_set = set(cycle) | set(assignments)
for v_i in subcycle_set:
self.problem += pulp.lpSum(
self.cycle_vars[min(point1, point2)][max(point1, point2)]
for point1, point2 in self.delta_edges(subcycle_set)
) >= 2 * pulp.lpSum(
self.assignment_vars[v_i][v_j] for v_j in subcycle_set
)
self.elapsed_time = time.time() - start_time
self.iteration += 1
print(
f"Iteration: {self.iteration}"
f" | {self.elapsed_time:.2f} seconds elapsed"
f" | State: {'Solving' if self.running else 'Solved'}"
)
if self.running and cycles == last_solution:
print("ERROR: No new solution despite new constraints")
self.running = False
last_solution = cycles
print("->".join(map(str, cycles[0][0])))
compute_thread = ComputeThread(graph)
compute_thread.start()
def draw_graph(_):
"""FuncAnimation function to constantly redraw the graph"""
plt.gca().clear()
edge_thickness = [graph[u][v].get("thickness", 1) for u, v in graph.edges]
node_colors = [graph.nodes[node].get("color", "#1f78b4") for node in graph.nodes]
nx.draw(
graph,
pos=nx.get_node_attributes(graph, "pos"),
node_color=node_colors,
width=edge_thickness,
with_labels=True,
)
plt.title(
f"Iteration: {compute_thread.iteration}"
f" | {compute_thread.elapsed_time:.2f} seconds elapsed"
f" | State: {'Solving' if compute_thread.running else 'Solved'}"
)
plt.axis("equal")
func_anim = FuncAnimation(
plt.gcf(), draw_graph, interval=100, repeat=True, cache_frame_data=False
)
plt.show()
compute_thread.running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment