Skip to content

Instantly share code, notes, and snippets.

@GastonMazzei
Created February 11, 2022 23:50
Show Gist options
  • Save GastonMazzei/3258474507e9fdfcf0219ac20a9db894 to your computer and use it in GitHub Desktop.
Save GastonMazzei/3258474507e9fdfcf0219ac20a9db894 to your computer and use it in GitHub Desktop.
Code for the Q-Learning presentation at Paris-Saclay @ Dr. Abdel Lisser, Game theory
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
fig = plt.figure()
im = Image.open('ball.jpg')
L = 15
im = np.asarray(im.resize((im.size[0]//L, im.size[1]//L))).astype(np.float)/ 255
CENTER = fig.bbox.xmax//2, fig.bbox.ymax//4
DX,DY = -190,280#fig.bbox.xmax//10, fig.bbox.ymax //5
BALLX = {
0:CENTER[0] + 0*DX,
1:CENTER[0] + 1 * DX,
2:CENTER[0] + 2 * DX,
3:CENTER[0] + 3 * DX,
}
BALLY = {
0:CENTER[1] + 0*DY,
1:CENTER[1] + 1 * DY,
2:CENTER[1] + 2 * DY,
3:CENTER[1] + 3 * DY,
4:CENTER[1] + 4 * DY,
}
class Soccer:
def __init___(self) -> None:
return
def build(self) -> None:
"""
This only exists because the class
constructor is not being called, sigh
"""
self.counter = 1
self.build_field()
self.build_players()
return
def build_field(self) -> None:
self.LR = 0
self.factor = 0.9995 # TIME COST
self.current_cost = [self.factor].copy()[0]
self.X = 4
self.Y = 5
self.field = np.zeros((self.X,self.Y))
self.agent_initial_position = (1,1)
self.opponent_initial_position = (2,3)
self.agent_goal = ((1,0),(2,0))
self.opponent_goal = ((1,4),(2,4))
self.agent_goal_value = -1
self.opponent_goal_value = 1
self.history = [[],[],{}]
self.laps = 0
self.DRAW_LIMIT = 40
for x in self.agent_goal:
self.field[x[0],x[1]] = self.agent_goal_value
for x in self.opponent_goal:
self.field[x[0],x[1]] = self.opponent_goal_value
return None
def update_field(self) -> None:
self.field = np.zeros((self.X,self.Y))
for x in self.agent_goal:
self.field[x[0],x[1]] = self.agent_goal_value
for x in self.opponent_goal:
self.field[x[0],x[1]] = self.opponent_goal_value
self.field[self.agent[0],self.agent[1]] = self.agent_value
self.field[self.opponent[0],self.opponent[1]] = self.opponent_value
def reset_initial_positions(self, has_agent=True, soft=False) -> None:
if soft:
if has_agent:
self.future_agent = [self.agent_initial_position[0],self.agent_initial_position[1],True]
self.future_opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False]
else:
self.future_agent = [self.agent_initial_position[0],self.agent_initial_position[1],False]
self.future_opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],True]
self.opponent_local_reward = self.LR
self.agent_local_reward = self.LR
return
if has_agent:
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],True]
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False]
else:
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],False]
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],True]
self.opponent_local_reward = self.LR
self.agent_local_reward = self.LR
return
def build_players(self) -> None:
# Actions
self.actions = ["E","W","N","S"]
self.agent_action = "E"
self.opponent_action = "E"
# Rewards/Costs
self.agent_cost = 0
self.opponent_cost = 0
self.agent_local_reward=self.LR
self.opponent_local_reward=self.LR
self.history[2]["wins"] = [0]
self.history[2]["losses"] = [0]
self.history[2]["draws"] = [0]
# Position and ball possession
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],True]
self.ball = [self.agent[0], self.agent[1]]
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False]
# Rendering values
self.agent_value = 2
self.opponent_value = -2
self.field[self.agent[0],self.agent[1]] = self.agent_value
self.field[self.opponent[0],self.opponent[1]] = self.opponent_value
# Q-learning
self.alpha = 1
self.alpha_diminishment_factor = 1#0.999999999999 # ALPHA DIMINISHER
self.history[2]["std_Q"] = []
self.deterministic = False
self.Qagent = np.asarray([np.random.rand(*self.field.shape)*0.1 for _ in self.actions] + [np.random.rand(*self.field.shape)*0.01 for _ in self.actions])
self.agent_policies = [np.random.choice(self.actions) for _ in range(self.X * self.Y * 2)]
def apply_environment(self) -> None:
"""
Is a movement accepted or not?
"""
self.laps += 1
self.future_agent = self.agent.copy()
self.future_opponent = self.opponent.copy()
# Do we fall out of the grid?
answer_agent,new_pos_agent = check_movement(self.agent_action, self.agent[:2], (self.X,self.Y))
answer_opponent,new_pos_opponent = check_movement(self.opponent_action, self.opponent[:2], (self.X,self.Y))
# Do we bump against the other player?
if answer_agent:
if answer_opponent:
if tuple(new_pos_agent) != tuple(new_pos_opponent):
self.future_agent[:2] = new_pos_agent.copy()
elif tuple(new_pos_agent) != tuple(self.opponent[:2]):
self.future_agent[:2] = new_pos_agent.copy()
if answer_opponent:
if answer_agent:
if tuple(new_pos_opponent) != tuple(new_pos_agent):
self.future_opponent[:2] = new_pos_opponent.copy()
elif tuple(new_pos_opponent) != tuple(self.agent[:2]):
self.future_opponent[:2] = new_pos_opponent.copy()
# Does the ball change posession?
self.ball_posession_constraint()
def display(self, save=False, name=''):
global im
fig = plt.figure(figsize=(6.4, 4.8), dpi=300)
self.update_field()
plt.imshow(self.field)
plt.title(f'Agent Reward: {round(self.agent_cost,3)} | Opponent Reward: {round(self.opponent_cost,3)} | iteration: {self.counter}')
if self.agent[2]:
fig.figimage(im, BALLY[self.agent[1]], BALLX[self.agent[0]])
else:
fig.figimage(im, BALLY[self.opponent[1]], BALLX[self.opponent[0]])
if save:
if name:
fig.savefig(name)
else:
fig.savefig('result.png')
else:
plt.show()
plt.close('all')
def time_evolver(self) -> None:
self.current_cost *= self.factor
self.counter += 1
self.opponent = self.future_opponent.copy()
self.agent = self.future_agent.copy()
if self.laps == self.DRAW_LIMIT:
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0]
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0]
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 1]
MATCH.reset_initial_positions(np.random.choice([True,False]))
self.laps = 0
return
return
def record_reward(self) -> None:
self.history[0] += [self.agent_cost].copy()
self.history[1] += [self.opponent_cost].copy()
return
def update_reward(self) -> None:
reward_agent = self.LR
reward_opponent = self.LR
if self.future_agent[2] and tuple(self.future_agent[:2]) in self.opponent_goal:
reward_agent += self.current_cost
reward_opponent -= self.current_cost
self.opponent_local_reward = -self.current_cost
self.agent_local_reward = self.current_cost
self.reset_initial_positions(has_agent=False, soft=True)
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 1]
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0]
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0]
elif self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.agent_goal:
reward_opponent += self.current_cost
reward_agent -= self.current_cost
self.opponent_local_reward = self.current_cost
self.agent_local_reward = -self.current_cost
self.reset_initial_positions(has_agent=True, soft=True)
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0]
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 1]
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0]
elif self.future_agent[2] and tuple(self.future_agent[:2]) in self.agent_goal:
reward_agent -= self.current_cost
reward_opponent += self.current_cost
self.opponent_local_reward = self.current_cost
self.agent_local_reward = -self.current_cost
self.reset_initial_positions(has_agent=True, soft=True)
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0]
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 1]
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0]
elif self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.opponent_goal:
reward_agent += self.current_cost
reward_opponent -= self.current_cost
self.opponent_local_reward = -self.current_cost
self.agent_local_reward = self.current_cost
self.reset_initial_positions(has_agent=False, soft=True)
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 1]
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0]
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0]
else: pass
#self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0]
#self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0]
#self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0]
self.agent_cost += reward_agent
self.opponent_cost += reward_opponent
def ball_posession_constraint(self) -> None:
"""
If they are in front of each other then the posession changes
"""
# If they are inside the goal then this is a goal, it cant be prevented
if ((self.future_agent[2] and tuple(self.future_agent[:2]) in self.opponent_goal) or
(self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.agent_goal)):
return
if abs(self.future_agent[0]-self.future_opponent[0]) + abs(self.future_agent[1] - self.future_opponent[1]) == 1:
self.future_agent[2] = not self.future_agent[2]
self.future_opponent[2] = not self.future_opponent[2]
return
def generate_random_actions(self, T="single") -> None:
if T=="single": # Agent: random | Opponent: still
self.agent_action = np.random.choice(self.actions)
elif T=="both": # Agent: random | Opponent: random
self.opponent_action = np.random.choice(self.actions)
self.agent_action = np.random.choice(self.actions)
elif T=="singleVSsmart": # Agent: smart ad-hoc (w randomness) | Opponent: random
self.agent_action = adhoc_random_opponent(self.agent[:2],self.agent[2], self.Y)
self.opponent_action = np.random.choice(self.actions)
elif T=="singleVSQlearning" or T=="singleVSballAwareQlearning": # Agent: Q-learning | Opponent: random
if T=="singleVSballAwareQlearning": self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)]
else: self.agent_action = self.agent_policies[self.agent[0] * self.agent[1]]
self.opponent_action = np.random.choice(self.actions)
elif T=="smartVSQlearning" or T=="smartVSballAwareQlearning": # Agent: Q-learning | Opponent: smart (deterministic)
if T=="smartVSballAwareQlearning": self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)]
else: self.agent_action = self.agent_policies[self.agent[0] * self.agent[1]]
self.opponent_action = adhoc_deterministic_opponent(self.opponent[:2],self.opponent[2], self.Y)
elif T=="noisysmartVSballAwareQlearning": # Agent: Q-learning | Opponent: smart (w randomness)
self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)]
self.opponent_action = adhoc_random_opponent(self.opponent[:2],self.opponent[2], self.Y)
return
def learn(self, T="single", verbose=False) -> None:
if T=="singleVSQlearning" or T=="smartVSQlearning":
if verbose: print(f'Action was: {self.agent_policies[self.agent[0] * self.agent[1]]}')
maxQ = max([self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))])
self.Qagent[self.actions.index(self.agent_action)][self.agent[0],self.agent[1]] = (
(1-self.alpha) * self.Qagent[self.actions.index(self.agent_action)][self.agent[0],self.agent[1]] +
self.alpha * ( self.agent_cost + self.factor * maxQ ) # agent_cost instead of agent_local_reward
)
localActionCost = [self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))]
self.history[2]["std_Q"] += [np.diff(np.sort(localActionCost)[-2:])[0]].copy()
if self.history[2]["std_Q"][-1] == 0:
maxQ = max([self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))])
aux = [self.actions[i] for i,x in enumerate(localActionCost) if x==maxQ]
self.agent_policies[self.agent[0] * self.agent[1]] = np.random.choice(aux)
else:
self.agent_policies[self.agent[0] * self.agent[1]] = self.actions[np.argmax(localActionCost)]
self.alpha *= self.alpha_diminishment_factor
if T=="singleVSballAwareQlearning" or T=="smartVSballAwareQlearning" or "noisysmartVSballAwareQlearning":
maxQ = max([self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))])
self.Qagent[self.actions.index(self.agent_action) + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] = (
(1-self.alpha) * self.Qagent[self.actions.index(self.agent_action) + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] +
self.alpha * ( self.agent_cost + self.factor * maxQ ) # agent_cost instead of agent_local_reward
)
localActionCost = [self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))]
self.history[2]["std_Q"] += [np.diff(np.sort(localActionCost)[-2:])[0]].copy()
if self.history[2]["std_Q"][-1] == 0:
maxQ = max([self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))])
aux = [self.actions[i] for i,x in enumerate(localActionCost) if x==maxQ]
#print(f'maxQ is {maxQ} and aux is {aux} and localactioncost is {localActionCost}')
self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y] = np.random.choice(aux)
else:
self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y] = self.actions[np.argmax(localActionCost)]
#print(f'Action is: {self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y]}\n\n')
self.alpha *= self.alpha_diminishment_factor
return
def adhoc_random_opponent(XY, ballPossession, Lim) -> str:
if np.random.rand()<0.05 or (XY[1]==0 or XY[1]==Lim):
if np.random.rand()<0.5: return "E"
else: return "W"
if ballPossession:
return "N"
else:
return "S"
def adhoc_deterministic_opponent(XY, ballPossession, Lim) -> str:
"""
Opponent moves to the left of the screen if it has the ball
else to the right
"""
if not ballPossession:
return "N"
return "S"
def can_move_down(pos, XY):
"""
Decide if the movement is allowed
"""
Y = -1
if pos[1] + Y >= XY[1] or pos[1] + Y < 0:
return False
return True
def can_move_up(pos, XY):
"""
Decide if the movement is allowed
"""
Y = 1
if pos[1] + Y >= XY[1] or pos[1] + Y < 0:
return False
return True
def can_move_right(pos, XY):
"""
Decide if the movement is allowed
"""
X = 1
if pos[0] + X >= XY[0] or pos[0] + X < 0:
return False
return True
def can_move_left(pos, XY):
"""
Decide if the movement is allowed
"""
X = -1
if pos[0] + X >= XY[0] or pos[0] + X < 0:
return False
return True
def check_movement(action, pos,XY):
if action == "E":
return can_move_right(pos,XY), [pos[0]+1,pos[1]]
elif action == "W":
return can_move_left(pos,XY), [pos[0]-1,pos[1]]
elif action == "N":
return can_move_up(pos,XY), [pos[0],pos[1]+1]
else:
return can_move_down(pos,XY), [pos[0],pos[1]-1]
return False,[pos[0],pos[1]]
MATCH = Soccer()
MATCH.build()
LIMIT = 100000
# 0 1 2 3 4 5 6
TYPE = ["single", "both", "singleVSsmart", "singleVSQlearning","smartVSQlearning","smartVSballAwareQlearning","singleVSballAwareQlearning",
# 7
"noisysmartVSballAwareQlearning"][5]
counter = 0
VIDEO = [False,True][0] #V
VIDEO_ONLY_AFTER = [False,True][0] #V
FRAMES = 15
AFTER = LIMIT-2 * FRAMES
DISPLAY = [False,True][0] #D
if VIDEO: #V
import os #V
os.system('rm soccer*.png') #V
if VIDEO: #V
if not VIDEO_ONLY_AFTER:
MATCH.display(True, 'soccer'+str(counter).zfill(3)+'.png') #V
counter += 1 #V
elif DISPLAY:
MATCH.display() #D
counter += 1 #D
while True:
if counter==AFTER and VIDEO_ONLY_AFTER: MATCH.reset_initial_positions(True)
if counter==AFTER+FRAMES and VIDEO_ONLY_AFTER: MATCH.reset_initial_positions(False)
MATCH.generate_random_actions(T=TYPE)
MATCH.apply_environment()
MATCH.update_reward()
MATCH.record_reward()
if counter >= AFTER: VERB = True
else: VERB=False
MATCH.learn(T=TYPE, verbose=VERB)
MATCH.time_evolver()
# Display Module
if VIDEO: #V
if not VIDEO_ONLY_AFTER:
MATCH.display(True, 'soccer'+str(counter).zfill(3)+'.png') #V
elif counter >= AFTER:
MATCH.display(True, 'soccer'+str(counter-AFTER).zfill(3)+'.png') #V
elif DISPLAY: #D
MATCH.display() #D
counter += 1
if counter == LIMIT:
break
if (not VIDEO and not DISPLAY) or VIDEO_ONLY_AFTER:
fig, ax = plt.subplots(figsize=(15,5))
ax.plot(range(1,LIMIT+1),MATCH.history[0],c='r',label='Reward Agent (Q-learning)', lw=5, alpha=0.8)
ax.plot(range(1,LIMIT+1),MATCH.history[1],c='b',label='Reward Opponent (random)', lw=5, alpha=0.8)
ax.grid()
ax.set_xscale('log')
ax.legend()
plt.show()
try:
if True:
fig, ax = plt.subplots(figsize=(15,5))
SUM = np.asarray([MATCH.history[2]["wins"][i] + MATCH.history[2]["losses"][i] + MATCH.history[2]["draws"][i] for i in range(len(MATCH.history[2]["wins"]))])
SUM = np.where(SUM>0,SUM,1)
ax.plot(np.asarray(MATCH.history[2]["wins"]) / SUM,c='k',label='wins', lw=3, alpha=0.8)
ax.plot(np.asarray(MATCH.history[2]["losses"]) / SUM,c='g',label='losses', lw=3, alpha=0.8)
ax.plot(np.asarray(MATCH.history[2]["draws"]) / SUM,c='b',label='draws', lw=3, alpha=0.8)
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["wins"]) / SUM,c='k', lw=3, alpha=0.8)
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["losses"]) / SUM,c='g', lw=3, alpha=0.8)
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["draws"]) / SUM,c='b', lw=3, alpha=0.8)
ax.grid()
ax.set_xscale('log')
ax.legend()
plt.show()
except: pass
try:
if (not VIDEO and not DISPLAY) or VIDEO_ONLY_AFTER:
fig, ax = plt.subplots(figsize=(15,5))
ax.plot(range(1,LIMIT+1),MATCH.history[2]['std_Q'],c='g',label='max(Q(s,a))-2nd_max(Q(s,a))', lw=3, alpha=0.8)
#ax.plot(range(1,LIMIT+1),[MATCH.alpha_diminishment_factor**i for i in range(1,LIMIT+1)],c='k',label='Alpha (Q-learning)', lw=3, alpha=0.8)
ax.grid()
ax.set_xscale('log')
ax.legend()
plt.show()
except: pass
if VIDEO: #V
try:os.remove("out.mp4")
except:pass
os.system("ffmpeg -framerate 3 -pattern_type glob -i 'soccer*.png' -c:v libx264 -pix_fmt yuv420p out.mp4")
if TYPE=="singleVSQlearning":
np.save(f'{TYPE}-{LIMIT}-alphadim:{MATCH.alpha_diminishment_factor}.npy', np.asarray(MATCH.history))
else:
np.save(f'{TYPE}-{LIMIT}.npy', np.asarray(MATCH.history[:2]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment