Skip to content

Instantly share code, notes, and snippets.

@carlosgmartin
Last active June 20, 2019 06:39
Show Gist options
  • Select an option

  • Save carlosgmartin/69ec70444ced50c17f55fadccd01b11d to your computer and use it in GitHub Desktop.

Select an option

Save carlosgmartin/69ec70444ced50c17f55fadccd01b11d to your computer and use it in GitHub Desktop.
Q learning with multiple simultaneous agents
import numpy as np
from time import sleep
from itertools import count
from pdb import set_trace
import matplotlib.pyplot as plt
def softmax(x):
y = np.exp(x - x.max(-1, keepdims=True))
return y / y.sum(-1, keepdims=True)
def categorical(p):
c = p.cumsum(-1)
u = np.random.uniform(size=c.shape[0])[:, None]
return (u < c).argmax(-1)
def softmax_policy(q_actions, greed):
return np.where(greed < np.inf, categorical(softmax(greed[:, None] * q_actions)), q_actions.argmax(-1))
if __name__ == '__main__':
np.set_printoptions(precision=2, suppress=True)
size = 12
agents = 1000
learn_rate = np.full(agents, 1) # 1 if environment is deterministic
discount = np.full(agents, 1)
greed = np.full(agents, 1)
true_positive_rate = np.linspace(.5, 1, agents)
true_negative_rate = np.linspace(.5, 1, agents)
actions = np.array([[1,0], [0,1], [-1,0], [0,-1]])
goal = np.tile(np.random.randint(size, size=2), (agents, 1))
episodes = np.zeros(agents, dtype=int)
obstacles = np.tile(np.random.binomial(1, .2, size=(size, size)).astype(bool), (agents, 1, 1))
obstacles[np.arange(agents),goal[:,0], goal[:,1]] = False # goal should not be covered by an obstacle
state = np.random.randint(size, size=(agents, 2))
# agents that receive reward on goal vs reward from oracle
sparse_reward = np.full(agents, False)
sparse_reward[::2] = True
# find distance from each state to each goal, accounting for obstacles
dists = np.empty((agents, size, size), dtype=float)
for index, (o, g) in enumerate(zip(obstacles, goal)):
d = np.full(o.shape, np.inf)
d[g[0], g[1]] = 0
d_new = np.empty_like(d)
while True:
for i in range(o.shape[0]):
for j in range(o.shape[1]):
if o[i, j]:
d_new[i,j] = np.inf
else:
m = d[i, j]
if i > 0:
m = min(m, d[i-1,j] + 1)
if j > 0:
m = min(m, d[i,j-1] + 1)
if i < d.shape[0] - 1:
m = min(m, d[i+1,j] + 1)
if j < d.shape[1] - 1:
m = min(m, d[i,j+1] + 1)
d_new[i,j] = m
if np.array_equal(d_new, d):
break
d = d_new.copy()
dists[index] = d
q = np.zeros((agents, size, size, len(actions)))
for steps in count():
if steps % 200 == 0:
plt.scatter(true_positive_rate[sparse_reward], steps/episodes[sparse_reward], s=2, c='red', label='sparse')
plt.scatter(true_positive_rate[~sparse_reward], steps/episodes[~sparse_reward], s=2, c='green', label='oracle')
plt.xlabel('true positive and true negative rate for oracle')
plt.ylabel('time to goal')
plt.minorticks_on()
plt.legend(loc='best')
plt.grid(True, which='both', alpha=.2)
plt.ylim((0, None))
plt.title('{} × {} grid, {} steps'.format(size, size, steps))
plt.pause(.01)
plt.tight_layout()
plt.clf()
goal_reached = (state == goal).all(-1)
episodes += goal_reached
reset = goal_reached
# reset all environments after a large number of steps
# in case some of the agents are stuck and cannot reach the goal
if steps % 50 == 0:
reset = np.full(agents, True)
action = softmax_policy(q[np.arange(agents), state[:, 0], state[:, 1]], greed)
next_state = np.where(reset[:, None], np.random.randint(size, size=(agents, 2)), (state + actions[action]).clip(0, size - 1))
# handle obstacle collisions
collisions = obstacles[np.arange(obstacles.shape[0]),next_state[:,0],next_state[:,1]]
next_state[collisions] = state[collisions]
# measure whether agent moved toward goal (ignoring obstacles)
# correct_direction = (actions[action] * (goal - state)).sum(-1) > 0
# measure whether agent moved closer to the goal (accounting for obstacles)
correct_direction = dists[np.arange(agents), next_state[:, 0], next_state[:, 1]] < dists[np.arange(agents), state[:, 0], state[:, 1]]
reward = np.where(
sparse_reward,
goal_reached - 1,
np.where(
correct_direction,
np.where(
np.random.binomial(1, true_positive_rate),
0,
-1
),
np.where(
np.random.binomial(1, true_negative_rate),
-1,
0
)
)
)
info = 'tp\tα\tγ\tβ\ts\ta\ts\'\tr\tt\n' + '\n'.join(
'\t'.join([
'{:.1f}'.format(true_positive_rate[m]),
'{:.1f}'.format(learn_rate[m]),
'{:.1f}'.format(discount[m]),
'{:.1f}'.format(greed[m]),
str(state[m]),
'↓→↑←'[action[m]],
str(next_state[m]),
'{:.1f}'.format(reward[m]),
'{:.1f}'.format(steps/episodes[m])
])
for m in np.arange(agents)[::agents//10]
)
agent = 0
grid = '\n'.join(
' '.join(
'A' if np.array_equal([i, j], state[agent]) else
'G' if np.array_equal([i, j], goal[agent]) else
'█' if obstacles[agent, i, j] else
'↓→↑←'[q[agent, i, j].argmax(-1)]
for j in range(size)
)
for i in range(size)
)
# policy = '\n'.join(map(' '.join, np.choose(q[agent].argmax(-1), '↓→↑←')))
state_values = str(q[agent].max(-1))
print('\033c' + '\n\n'.join([
str(steps) + ' steps',
info,
grid,
state_values
]))
q[np.arange(agents), state[:, 0], state[:, 1], action] += np.where(reset, 0, learn_rate * (reward + discount * q[np.arange(agents), next_state[:, 0], next_state[:, 1]].max(-1) - q[np.arange(agents), state[:, 0], state[:, 1], action]))
state = next_state
# sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment