Skip to content

Instantly share code, notes, and snippets.

@mbillingr
Created January 7, 2019 17:09
Show Gist options
  • Save mbillingr/afc092af26dc72530176a53ef638a696 to your computer and use it in GitHub Desktop.
Save mbillingr/afc092af26dc72530176a53ef638a696 to your computer and use it in GitHub Desktop.
Resolve multiple agents' weighted movement choices to avoid collisions (=assignment problem) or to allow collisions (=something else, seems solvable with simulated annealing...)
from collections import Counter
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from heapq import *
from scipy.optimize import linear_sum_assignment
MOVE_FACTOR = 10
NEG_DIR = {'n': 's', 's': 'n', 'e': 'w', 'w': 'e'}
class Map:
def __init__(self, data):
self.data = data
def new(w, h):
data = np.zeros((h, w))
for s in range(1, 5):
data += ndimage.gaussian_filter(np.random.rand(h, w), sigma=(s, s), order=0, mode='wrap') * s**2
data -= np.min(data)
data *= 1000 / np.max(data)
return Map(data)
def clone(self):
return Map(self.data)
def move(self, pos, d):
x, y = pos
if d == 'n' or d == 1:
return x, (y - 1) % self.data.shape[0]
if d == 's' or d == 3:
return x, (y + 1) % self.data.shape[0]
if d == 'w' or d == 2:
return (x - 1) % self.data.shape[1], y
if d == 'e' or d == 4:
return (x + 1) % self.data.shape[1], y
return pos
def move_costs(self):
return self.data / MOVE_FACTOR
#n = np.roll(self.data, -1, 0)
#s = np.roll(self.data, 1, 0)
#w = np.roll(self.data, -1, 1)
#e = np.roll(self.data, 1, 1)
def cumulative_target_cost(self, targets):
queue = [(0, 0, tuple(target), None, ' ') for target in np.atleast_2d(targets)]
cost_map = np.empty_like(self.data) + np.nan
dist_map = np.empty_like(self.data)
dir_map = np.empty_like(self.data, dtype='<U1')
route = {}
while len(queue) > 0:
c, dist, pos, parent, d = heappop(queue)
if c >= cost_map[pos[::-1]]: continue
cost_map[pos[::-1]] = c
dist_map[pos[::-1]] = dist
dir_map[pos[::-1]] = d
if parent is not None:
route[pos[::-1]] = parent
for d in 'news':
p = self.move(pos, d)
mc = np.floor(self.data[p[::-1]] / MOVE_FACTOR)
heappush(queue, (c+1+mc, dist+1, p, pos, NEG_DIR[d]))
return cost_map, dir_map
def plot(self, ax=None):
if ax is None:
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(self.data, vmin=0, vmax=1000)
def simulated_annealing(state, temperature=None):
#if temperature is None: temperature = iter(np.logspace(2, -2, 20))
if temperature is None: temperature = iter(1 / np.linspace(0, 10, 201)[1:])
e = state.energy()
for i, t in enumerate(temperature):
candidate_states = [state] + state.candidate_moves()
energies = np.array([s.energy() for s in candidate_states])
m = np.argmin(energies)
if energies[m] >= e:
ps = np.exp(-(energies - e) / t)
m = np.random.choice(len(candidate_states), p=ps / np.sum(ps))
state = candidate_states[m]
e = energies[m]
#print(i, energies)
return state
class MovementState:
def __init__(self, target_positions, costs, cargo, assignment=None):
self.positions = target_positions
self.costs = costs
self.cargo = cargo
self.assignment = assignment
if assignment is None:
self.assignment = [np.argmin(cost) for cost in costs]
def child_state(self, assignment):
return MovementState(self.positions, self.costs, self.cargo, assignment)
def energy(self):
final_positions = {}
total_cost = 0
for a, p, c in zip(self.assignment, self.positions, self.costs):
pos = p[a]
cost = c[a]
final_positions[pos] = final_positions.get(pos, 0) + 1
total_cost += cost
if len(final_positions) < len(self.positions):
for a, p, c in zip(self.assignment, self.positions, self.cargo):
if final_positions[p[a]] > 1:
total_cost += 1000 + c
return total_cost
def candidate_moves(self):
children = []
for i in range(len(self.assignment)):
a = self.assignment.copy()
a[i] = (a[i] + 1) % len(self.costs[i])
children.append(self.child_state(a))
a = self.assignment.copy()
a[i] = (a[i] - 1) % len(self.costs[i])
children.append(self.child_state(a))
return children
def print_assignment(self):
for a, p in zip(self.assignment, self.positions):
print(' ->', p[a])
def solve_hungarian(self):
all_pos = set()
for pos in self.positions:
for p in pos:
all_pos.add(p)
posmap = {}
for i, p in enumerate(all_pos):
posmap[i] = p
posmap[p] = i
weights = np.zeros((len(self.positions), len(all_pos))) + 9e9
for r, (pos, costs) in enumerate(zip(self.positions, self.costs)):
for c, (p, cost) in enumerate(zip(pos, costs)):
weights[r, posmap[p]] = cost
rows, cols = linear_sum_assignment(weights)
r = np.argsort(rows)
rows, cols = rows[r], cols[r]
assignment = []
for r, c in zip(rows, cols):
a = self.positions[r].index(posmap[c])
assignment.append(a)
return self.child_state(assignment)
#Hungarian(weights).run()
class Hungarian:
"""http://csclab.murraystate.edu/~bob.pilgrim/445/munkres.html"""
"""maybe just use one of these: https://crates.io/search?q=munkres"""
def __init__(self, weight_matrix):
self.weight_matrix = weight_matrix
self.mask_matrix = np.zeros_like(weight_matrix, dtype=int)
def run(self):
step = self.step_one
while step is not None:
step = step()
def step_one(self):
self.weight_matrix -= np.min(self.weight_matrix, axis=1, keepdims=True)
return self.step_two
def step_two(self):
row_covered = np.zeros(self.weight_matrix.shape[0], dtype=bool)
col_covered = np.zeros(self.weight_matrix.shape[1], dtype=bool)
for r, c in zip(*np.where(self.weight_matrix == 0)):
if not row_covered[r] and not col_covered[c]:
self.weight_matrix[r, c] = 1
row_covered[r] = 1
col_covered[c] = 1
return self.step_three
def step_three(self):
if np.all(np.any(self.mask_matrix == 1, axis=1)):
return self.step_seven
else:
return self.step_four
def step_four(self):
row_covered = np.zeros(self.weight_matrix.shape[0], dtype=bool)
col_covered = np.any(self.mask_matrix == 1, axis=0)
while True:
z = self.find_a_zero(row_covered, col_covered)
if z is None:
return self.step_six
row, col = z
self.mask_matrix[row, col] = 2
c = np.flatnonzero(self.mask_matrix[row, :] == 1)
if len(c) > 0:
col = c[0]
row_covered[row] = True
col_covered[col] = False
else:
return self.step_five
def step_five(self):
raise NotImplementedError()
def step_six(self):
raise NotImplementedError()
def step_seven(self):
raise NotImplementedError()
def find_a_zero(self, row_covered, col_covered):
for r, c in zip(*np.where(self.weight_matrix == 0)):
if not row_covered[r] and not col_covered[c]:
return r, c
return None
def solve_moves(positions, move_costs, cargo, map):
candidate_solution = np.argmin(move_costs, axis=1)
new_positions = map.move(p, d)
counts = Counter(new)
conflicts = [i for i, p in enumerate(new_positions) if counts[p] > 1]
if len(conflicts) == 0:
return candidate_solution
m = Map.new(8, 8)
c, d = m.cumulative_target_cost((4, 4))
mc = m.move_costs()
#state = MovementState([[(0, 0), (1, 0)], [(1, 0), (2, 0)]],
# [[10000, 1], [1, 10000]],
# [100, 200])
#
#res = simulated_annealing(state)
#print(res.assignment)
#print(res.energy())
state = MovementState([[(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)],
[(1, 0), (2, 0), (0, 0), (1, 1), (1, -1)],
[(-1, 0), (0, 0), (-2, 0), (-1, 1), (-1, -1)],
[(0, 1), (1, 1), (-1, 1), (0, 2), (0, 0)],
[(0, -1), (1, -1), (-1, -1), (0, 0), (0, -2)]],
[[1, 1, 100, 1, 1],
[2, 3, 1, 3, 3],
[2, 1, 3, 3, 3],
[2, 3, 3, 3, 1],
[2, 3, 3, 1, 3]],
[0, 1000, 1000, 1000, 1000])
res = simulated_annealing(state)
print(res.assignment)
print(res.energy())
res.print_assignment()
resh = res.solve_hungarian()
print(resh.energy())
resh.print_assignment()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment