Created
January 7, 2019 17:09
-
-
Save mbillingr/afc092af26dc72530176a53ef638a696 to your computer and use it in GitHub Desktop.
Resolve multiple agents' weighted movement choices to avoid collisions (=assignment problem) or to allow collisions (=something else, seems solvable with simulated annealing...)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import Counter | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from scipy import ndimage | |
from heapq import * | |
from scipy.optimize import linear_sum_assignment | |
MOVE_FACTOR = 10 | |
NEG_DIR = {'n': 's', 's': 'n', 'e': 'w', 'w': 'e'} | |
class Map: | |
def __init__(self, data): | |
self.data = data | |
def new(w, h): | |
data = np.zeros((h, w)) | |
for s in range(1, 5): | |
data += ndimage.gaussian_filter(np.random.rand(h, w), sigma=(s, s), order=0, mode='wrap') * s**2 | |
data -= np.min(data) | |
data *= 1000 / np.max(data) | |
return Map(data) | |
def clone(self): | |
return Map(self.data) | |
def move(self, pos, d): | |
x, y = pos | |
if d == 'n' or d == 1: | |
return x, (y - 1) % self.data.shape[0] | |
if d == 's' or d == 3: | |
return x, (y + 1) % self.data.shape[0] | |
if d == 'w' or d == 2: | |
return (x - 1) % self.data.shape[1], y | |
if d == 'e' or d == 4: | |
return (x + 1) % self.data.shape[1], y | |
return pos | |
def move_costs(self): | |
return self.data / MOVE_FACTOR | |
#n = np.roll(self.data, -1, 0) | |
#s = np.roll(self.data, 1, 0) | |
#w = np.roll(self.data, -1, 1) | |
#e = np.roll(self.data, 1, 1) | |
def cumulative_target_cost(self, targets): | |
queue = [(0, 0, tuple(target), None, ' ') for target in np.atleast_2d(targets)] | |
cost_map = np.empty_like(self.data) + np.nan | |
dist_map = np.empty_like(self.data) | |
dir_map = np.empty_like(self.data, dtype='<U1') | |
route = {} | |
while len(queue) > 0: | |
c, dist, pos, parent, d = heappop(queue) | |
if c >= cost_map[pos[::-1]]: continue | |
cost_map[pos[::-1]] = c | |
dist_map[pos[::-1]] = dist | |
dir_map[pos[::-1]] = d | |
if parent is not None: | |
route[pos[::-1]] = parent | |
for d in 'news': | |
p = self.move(pos, d) | |
mc = np.floor(self.data[p[::-1]] / MOVE_FACTOR) | |
heappush(queue, (c+1+mc, dist+1, p, pos, NEG_DIR[d])) | |
return cost_map, dir_map | |
def plot(self, ax=None): | |
if ax is None: | |
fig = plt.figure() | |
ax = fig.add_subplot(1, 1, 1) | |
plt.imshow(self.data, vmin=0, vmax=1000) | |
def simulated_annealing(state, temperature=None): | |
#if temperature is None: temperature = iter(np.logspace(2, -2, 20)) | |
if temperature is None: temperature = iter(1 / np.linspace(0, 10, 201)[1:]) | |
e = state.energy() | |
for i, t in enumerate(temperature): | |
candidate_states = [state] + state.candidate_moves() | |
energies = np.array([s.energy() for s in candidate_states]) | |
m = np.argmin(energies) | |
if energies[m] >= e: | |
ps = np.exp(-(energies - e) / t) | |
m = np.random.choice(len(candidate_states), p=ps / np.sum(ps)) | |
state = candidate_states[m] | |
e = energies[m] | |
#print(i, energies) | |
return state | |
class MovementState: | |
def __init__(self, target_positions, costs, cargo, assignment=None): | |
self.positions = target_positions | |
self.costs = costs | |
self.cargo = cargo | |
self.assignment = assignment | |
if assignment is None: | |
self.assignment = [np.argmin(cost) for cost in costs] | |
def child_state(self, assignment): | |
return MovementState(self.positions, self.costs, self.cargo, assignment) | |
def energy(self): | |
final_positions = {} | |
total_cost = 0 | |
for a, p, c in zip(self.assignment, self.positions, self.costs): | |
pos = p[a] | |
cost = c[a] | |
final_positions[pos] = final_positions.get(pos, 0) + 1 | |
total_cost += cost | |
if len(final_positions) < len(self.positions): | |
for a, p, c in zip(self.assignment, self.positions, self.cargo): | |
if final_positions[p[a]] > 1: | |
total_cost += 1000 + c | |
return total_cost | |
def candidate_moves(self): | |
children = [] | |
for i in range(len(self.assignment)): | |
a = self.assignment.copy() | |
a[i] = (a[i] + 1) % len(self.costs[i]) | |
children.append(self.child_state(a)) | |
a = self.assignment.copy() | |
a[i] = (a[i] - 1) % len(self.costs[i]) | |
children.append(self.child_state(a)) | |
return children | |
def print_assignment(self): | |
for a, p in zip(self.assignment, self.positions): | |
print(' ->', p[a]) | |
def solve_hungarian(self): | |
all_pos = set() | |
for pos in self.positions: | |
for p in pos: | |
all_pos.add(p) | |
posmap = {} | |
for i, p in enumerate(all_pos): | |
posmap[i] = p | |
posmap[p] = i | |
weights = np.zeros((len(self.positions), len(all_pos))) + 9e9 | |
for r, (pos, costs) in enumerate(zip(self.positions, self.costs)): | |
for c, (p, cost) in enumerate(zip(pos, costs)): | |
weights[r, posmap[p]] = cost | |
rows, cols = linear_sum_assignment(weights) | |
r = np.argsort(rows) | |
rows, cols = rows[r], cols[r] | |
assignment = [] | |
for r, c in zip(rows, cols): | |
a = self.positions[r].index(posmap[c]) | |
assignment.append(a) | |
return self.child_state(assignment) | |
#Hungarian(weights).run() | |
class Hungarian: | |
"""http://csclab.murraystate.edu/~bob.pilgrim/445/munkres.html""" | |
"""maybe just use one of these: https://crates.io/search?q=munkres""" | |
def __init__(self, weight_matrix): | |
self.weight_matrix = weight_matrix | |
self.mask_matrix = np.zeros_like(weight_matrix, dtype=int) | |
def run(self): | |
step = self.step_one | |
while step is not None: | |
step = step() | |
def step_one(self): | |
self.weight_matrix -= np.min(self.weight_matrix, axis=1, keepdims=True) | |
return self.step_two | |
def step_two(self): | |
row_covered = np.zeros(self.weight_matrix.shape[0], dtype=bool) | |
col_covered = np.zeros(self.weight_matrix.shape[1], dtype=bool) | |
for r, c in zip(*np.where(self.weight_matrix == 0)): | |
if not row_covered[r] and not col_covered[c]: | |
self.weight_matrix[r, c] = 1 | |
row_covered[r] = 1 | |
col_covered[c] = 1 | |
return self.step_three | |
def step_three(self): | |
if np.all(np.any(self.mask_matrix == 1, axis=1)): | |
return self.step_seven | |
else: | |
return self.step_four | |
def step_four(self): | |
row_covered = np.zeros(self.weight_matrix.shape[0], dtype=bool) | |
col_covered = np.any(self.mask_matrix == 1, axis=0) | |
while True: | |
z = self.find_a_zero(row_covered, col_covered) | |
if z is None: | |
return self.step_six | |
row, col = z | |
self.mask_matrix[row, col] = 2 | |
c = np.flatnonzero(self.mask_matrix[row, :] == 1) | |
if len(c) > 0: | |
col = c[0] | |
row_covered[row] = True | |
col_covered[col] = False | |
else: | |
return self.step_five | |
def step_five(self): | |
raise NotImplementedError() | |
def step_six(self): | |
raise NotImplementedError() | |
def step_seven(self): | |
raise NotImplementedError() | |
def find_a_zero(self, row_covered, col_covered): | |
for r, c in zip(*np.where(self.weight_matrix == 0)): | |
if not row_covered[r] and not col_covered[c]: | |
return r, c | |
return None | |
def solve_moves(positions, move_costs, cargo, map): | |
candidate_solution = np.argmin(move_costs, axis=1) | |
new_positions = map.move(p, d) | |
counts = Counter(new) | |
conflicts = [i for i, p in enumerate(new_positions) if counts[p] > 1] | |
if len(conflicts) == 0: | |
return candidate_solution | |
m = Map.new(8, 8) | |
c, d = m.cumulative_target_cost((4, 4)) | |
mc = m.move_costs() | |
#state = MovementState([[(0, 0), (1, 0)], [(1, 0), (2, 0)]], | |
# [[10000, 1], [1, 10000]], | |
# [100, 200]) | |
# | |
#res = simulated_annealing(state) | |
#print(res.assignment) | |
#print(res.energy()) | |
state = MovementState([[(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)], | |
[(1, 0), (2, 0), (0, 0), (1, 1), (1, -1)], | |
[(-1, 0), (0, 0), (-2, 0), (-1, 1), (-1, -1)], | |
[(0, 1), (1, 1), (-1, 1), (0, 2), (0, 0)], | |
[(0, -1), (1, -1), (-1, -1), (0, 0), (0, -2)]], | |
[[1, 1, 100, 1, 1], | |
[2, 3, 1, 3, 3], | |
[2, 1, 3, 3, 3], | |
[2, 3, 3, 3, 1], | |
[2, 3, 3, 1, 3]], | |
[0, 1000, 1000, 1000, 1000]) | |
res = simulated_annealing(state) | |
print(res.assignment) | |
print(res.energy()) | |
res.print_assignment() | |
resh = res.solve_hungarian() | |
print(resh.energy()) | |
resh.print_assignment() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment