Created
November 18, 2017 01:15
-
-
Save IshitaTakeshi/9a0558d868a2662811b5eae85bc91fcd to your computer and use it in GitHub Desktop.
1D Monte Carlo Localization https://en.wikipedia.org/wiki/Monte_Carlo_localization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from matplotlib.animation import FuncAnimation | |
from matplotlib import pyplot as plt | |
from mcl import Environment, MCL, Agent | |
def particle_weight(particle_observation, agent_observation): | |
return 1 if particle_observation == agent_observation else 0 | |
class Drawer(object): | |
def __init__(self, ax, | |
range_, doors, initial_location, controls, | |
uncertaintity, covariance_ratio): | |
environment = Environment(range_, doors, uncertaintity) | |
self.mcl = MCL(range_, n_particles, | |
environment.observe, particle_weight, | |
covariance_ratio) | |
self.agent = Agent(range_, initial_location, environment.observe) | |
self.agent_scatter = ax.scatter([], []) | |
self.particle_vlines = [] | |
for i in range(n_particles): | |
vline = ax.axvline(ymin=0.2, ymax=0.8, color="cyan") | |
self.particle_vlines.append(vline) | |
ax.set_xlim(environment.range) | |
ax.set_ylim([-0.2, 1.2]) | |
self.controls = controls | |
xs = np.linspace(*range_, 200) | |
doors = [1 if environment.is_door(x) else None for x in xs] | |
self.door_lines = ax.plot(xs, doors, linewidth=6, color="r") | |
def init(self): | |
self.agent_scatter.set_offsets(np.c_[self.agent.location, 0]) | |
for x, vline in zip(self.mcl.particles, self.particle_vlines): | |
vline.set_xdata(x) | |
return self.particle_vlines + [self.agent_scatter] | |
def __call__(self, i): | |
control = self.controls[i] | |
noise = np.random.normal(0, 0.3) | |
self.agent.move(control + noise) | |
particles = self.mcl.update_particles(control, self.agent.observation()) | |
self.agent_scatter.set_offsets(np.c_[self.agent.location, 0]) | |
for x, vline in zip(particles, self.particle_vlines): | |
vline.set_xdata(x) | |
return self.particle_vlines + [self.agent_scatter] | |
n_particles = 400 | |
range_ = [-4, 24] | |
doors = [[-2, -1], [3, 5], [8, 12], [18, 22]] | |
initial_location = 2 | |
uncertaintity = 0.05 | |
covariance_ratio = 0.08 | |
controls = [2, 5, -2, 1, 0, -3, -5, -2, 4, 2, 7, -3, -3, -4] | |
controls = np.vstack((controls, controls)).flatten() | |
fig, ax = plt.subplots() | |
drawer = Drawer(ax, range_, doors, initial_location, controls, | |
uncertaintity, covariance_ratio) | |
animation = FuncAnimation( | |
fig, drawer, | |
init_func=drawer.init, | |
frames=np.arange(len(controls)), | |
interval=1000, | |
) | |
animation.save("mcl.mp4", dpi=400) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class Environment(object): | |
"""The goal is to estimate the precise location of the agent""" | |
def __init__(self, range_, doors, uncertaintity): | |
assert(0 <= uncertaintity < 1) | |
self.range = range_ | |
self.doors = doors | |
self.uncertaintity = uncertaintity | |
def observe(self, location): | |
observation = self.is_door(location) | |
if np.random.random() < self.uncertaintity: | |
# sometimes returns incorrect observation because of uncertaintity | |
return 1 - observation | |
return observation | |
def is_door(self, location): | |
for begin, end in self.doors: | |
if begin <= location <= end: | |
return 1 | |
return 0 | |
class Agent(object): | |
"""Agent holding the true state""" | |
def __init__(self, range_, initial_location, observer): | |
self.location_ = initial_location | |
self.range = range_ | |
self.observe = observer | |
@property | |
def location(self): | |
return self.location_ | |
def observation(self): | |
return self.observe(self.location_) | |
def move(self, control): | |
"""Update the agent location""" | |
self.location_ = keep_in_range(self.location_ + control, self.range) | |
def keep_in_range(value, range_): | |
"""Keep particles or agent within the map""" | |
return np.clip(value, *range_) | |
class MCL(object): | |
def __init__(self, range_, n_particles, observer, particle_weight, | |
covariance_ratio=0.2): | |
self.range = range_ | |
self.observe = observer | |
self.particles = np.random.uniform(*self.range, n_particles) | |
self.particle_weight = particle_weight | |
self.covariance_ratio = covariance_ratio | |
def motion_update(self, particles, control): | |
covariance = np.abs(control) * self.covariance_ratio | |
particles += np.random.normal(control, covariance, len(particles)) | |
return keep_in_range(particles, self.range) | |
def update_particles(self, control, agent_observation): | |
particles = self.particles | |
n_particles = len(particles) | |
particles = self.motion_update(particles, control) | |
weights = np.empty(n_particles) | |
for i, x in enumerate(particles): | |
weights[i] = self.particle_weight( | |
self.observe(x), # particle observation | |
agent_observation | |
) | |
if weights.sum() == 0: | |
weights = np.ones(n_particles) | |
weights = weights / weights.sum() | |
particles = np.random.choice(particles, n_particles, p=weights) | |
self.particles = particles | |
return particles |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from numpy.testing import assert_array_equal | |
import unittest | |
from mcl import Environment, Agent, MCL | |
class TestEnvironment(unittest.TestCase): | |
def test_observe(self): | |
environment = Environment( | |
range_=[-1, 5], | |
doors=[[0, 2], [4, 5]], | |
uncertaintity=0.0 | |
) | |
self.assertEqual(environment.observe(-1), 0) | |
self.assertEqual(environment.observe(0), 1) | |
self.assertEqual(environment.observe(3), 0) | |
self.assertEqual(environment.observe(5), 1) | |
class TestAgent(unittest.TestCase): | |
def test_move(self): | |
agent = Agent( | |
range_=[-1, 5], | |
initial_location=3, | |
observer=None | |
) | |
agent.move(1) | |
self.assertEqual(agent.location, 4) | |
# clipped to be kept within the range | |
agent.move(2) | |
self.assertEqual(agent.location, 5) | |
agent.move(-4) | |
self.assertEqual(agent.location, 1) | |
# clipped to be kept within the range | |
agent.move(-4) | |
self.assertEqual(agent.location, -1) | |
def test_observation(self): | |
environment = Environment( | |
range_=[-1, 5], | |
doors=[[0, 2], [4, 5]], | |
uncertaintity=0.0 | |
) | |
agent = Agent( | |
range_=[-1, 5], | |
initial_location=3, | |
observer=environment.observe | |
) | |
agent.move(1) # agent is at 4 where a wall stands | |
self.assertEqual(agent.observation(), 1) | |
agent.move(-1) # agent is at 4 where no wall stands | |
self.assertEqual(agent.observation(), 0) | |
class TestMCL(unittest.TestCase): | |
def test_motion_update(self): | |
environment = Environment( | |
range_=[-1, 5], | |
doors=[[0, 2], [4, 5]], | |
uncertaintity=0.0 | |
) | |
mcl = MCL( | |
range_=[-1, 5], | |
n_particles=0, | |
observer=environment.observe, | |
particle_weight=None, | |
covariance_ratio=0 | |
) | |
particles = np.array([-1, 0, 1, 2, 3, 4, 5], dtype=np.float64) | |
particles = mcl.motion_update(particles, -2) | |
assert_array_equal(particles, [-1, -1, -1, 0, 1, 2, 3]) | |
particles = np.array([-1, 0, 1, 2, 3, 4, 5], dtype=np.float64) | |
particles = mcl.motion_update(particles, 2) | |
assert_array_equal(particles, [1, 2, 3, 4, 5, 5, 5]) | |
def test_update_particles(self): | |
particle_weight = lambda a, b: 1 if a == b else 0 | |
if __name__ == "__main__": | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment