Skip to content

Instantly share code, notes, and snippets.

@IshitaTakeshi
Created November 18, 2017 01:15
Show Gist options
  • Save IshitaTakeshi/67e7b5f4035063935fe2bc302f3fbc15 to your computer and use it in GitHub Desktop.
Save IshitaTakeshi/67e7b5f4035063935fe2bc302f3fbc15 to your computer and use it in GitHub Desktop.
import numpy as np
from matplotlib.animation import FuncAnimation
from matplotlib import pyplot as plt
from mcl import Environment, MCL, Agent
def particle_weight(particle_observation, agent_observation):
return 1 if particle_observation == agent_observation else 0
class Drawer(object):
def __init__(self, ax,
range_, doors, initial_location, controls,
uncertaintity, covariance_ratio):
environment = Environment(range_, doors, uncertaintity)
self.mcl = MCL(range_, n_particles,
environment.observe, particle_weight,
covariance_ratio)
self.agent = Agent(range_, initial_location, environment.observe)
self.agent_scatter = ax.scatter([], [])
self.particle_vlines = []
for i in range(n_particles):
vline = ax.axvline(ymin=0.2, ymax=0.8, color="cyan")
self.particle_vlines.append(vline)
ax.set_xlim(environment.range)
ax.set_ylim([-0.2, 1.2])
self.controls = controls
xs = np.linspace(*range_, 200)
doors = [1 if environment.is_door(x) else None for x in xs]
self.door_lines = ax.plot(xs, doors, linewidth=6, color="r")
def init(self):
self.agent_scatter.set_offsets(np.c_[self.agent.location, 0])
for x, vline in zip(self.mcl.particles, self.particle_vlines):
vline.set_xdata(x)
return self.particle_vlines + [self.agent_scatter]
def __call__(self, i):
control = self.controls[i]
noise = np.random.normal(0, 0.3)
self.agent.move(control + noise)
particles = self.mcl.update_particles(control, self.agent.observation())
self.agent_scatter.set_offsets(np.c_[self.agent.location, 0])
for x, vline in zip(particles, self.particle_vlines):
vline.set_xdata(x)
return self.particle_vlines + [self.agent_scatter]
n_particles = 400
range_ = [-4, 24]
doors = [[-2, -1], [3, 5], [8, 12], [18, 22]]
initial_location = 2
uncertaintity = 0.05
covariance_ratio = 0.08
controls = [2, 5, -2, 1, 0, -3, -5, -2, 4, 2, 7, -3, -3, -4]
controls = np.vstack((controls, controls)).flatten()
fig, ax = plt.subplots()
drawer = Drawer(ax, range_, doors, initial_location, controls,
uncertaintity, covariance_ratio)
animation = FuncAnimation(
fig, drawer,
init_func=drawer.init,
frames=np.arange(len(controls)),
interval=1000,
)
animation.save("mcl.mp4", dpi=400)
import numpy as np
class Environment(object):
"""The goal is to estimate the precise location of the agent"""
def __init__(self, range_, doors, uncertaintity):
assert(0 <= uncertaintity < 1)
self.range = range_
self.doors = doors
self.uncertaintity = uncertaintity
def observe(self, location):
observation = self.is_door(location)
if np.random.random() < self.uncertaintity:
# sometimes returns incorrect observation because of uncertaintity
return 1 - observation
return observation
def is_door(self, location):
for begin, end in self.doors:
if begin <= location <= end:
return 1
return 0
class Agent(object):
"""Agent holding the true state"""
def __init__(self, range_, initial_location, observer):
self.location_ = initial_location
self.range = range_
self.observe = observer
@property
def location(self):
return self.location_
def observation(self):
return self.observe(self.location_)
def move(self, control):
"""Update the agent location"""
self.location_ = keep_in_range(self.location_ + control, self.range)
def keep_in_range(value, range_):
"""Keep particles or agent within the map"""
return np.clip(value, *range_)
class MCL(object):
def __init__(self, range_, n_particles, observer, particle_weight,
covariance_ratio=0.2):
self.range = range_
self.observe = observer
self.particles = np.random.uniform(*self.range, n_particles)
self.particle_weight = particle_weight
self.covariance_ratio = covariance_ratio
def motion_update(self, particles, control):
covariance = np.abs(control) * self.covariance_ratio
particles += np.random.normal(control, covariance, len(particles))
return keep_in_range(particles, self.range)
def update_particles(self, control, agent_observation):
particles = self.particles
n_particles = len(particles)
particles = self.motion_update(particles, control)
weights = np.empty(n_particles)
for i, x in enumerate(particles):
weights[i] = self.particle_weight(
self.observe(x), # particle observation
agent_observation
)
if weights.sum() == 0:
weights = np.ones(n_particles)
weights = weights / weights.sum()
particles = np.random.choice(particles, n_particles, p=weights)
self.particles = particles
return particles
import numpy as np
from numpy.testing import assert_array_equal
import unittest
from mcl import Environment, Agent, MCL
class TestEnvironment(unittest.TestCase):
def test_observe(self):
environment = Environment(
range_=[-1, 5],
doors=[[0, 2], [4, 5]],
uncertaintity=0.0
)
self.assertEqual(environment.observe(-1), 0)
self.assertEqual(environment.observe(0), 1)
self.assertEqual(environment.observe(3), 0)
self.assertEqual(environment.observe(5), 1)
class TestAgent(unittest.TestCase):
def test_move(self):
agent = Agent(
range_=[-1, 5],
initial_location=3,
observer=None
)
agent.move(1)
self.assertEqual(agent.location, 4)
# clipped to be kept within the range
agent.move(2)
self.assertEqual(agent.location, 5)
agent.move(-4)
self.assertEqual(agent.location, 1)
# clipped to be kept within the range
agent.move(-4)
self.assertEqual(agent.location, -1)
def test_observation(self):
environment = Environment(
range_=[-1, 5],
doors=[[0, 2], [4, 5]],
uncertaintity=0.0
)
agent = Agent(
range_=[-1, 5],
initial_location=3,
observer=environment.observe
)
agent.move(1) # agent is at 4 where a wall stands
self.assertEqual(agent.observation(), 1)
agent.move(-1) # agent is at 4 where no wall stands
self.assertEqual(agent.observation(), 0)
class TestMCL(unittest.TestCase):
def test_motion_update(self):
environment = Environment(
range_=[-1, 5],
doors=[[0, 2], [4, 5]],
uncertaintity=0.0
)
mcl = MCL(
range_=[-1, 5],
n_particles=0,
observer=environment.observe,
particle_weight=None,
covariance_ratio=0
)
particles = np.array([-1, 0, 1, 2, 3, 4, 5], dtype=np.float64)
particles = mcl.motion_update(particles, -2)
assert_array_equal(particles, [-1, -1, -1, 0, 1, 2, 3])
particles = np.array([-1, 0, 1, 2, 3, 4, 5], dtype=np.float64)
particles = mcl.motion_update(particles, 2)
assert_array_equal(particles, [1, 2, 3, 4, 5, 5, 5])
def test_update_particles(self):
particle_weight = lambda a, b: 1 if a == b else 0
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment