Created
February 11, 2022 23:50
-
-
Save GastonMazzei/3258474507e9fdfcf0219ac20a9db894 to your computer and use it in GitHub Desktop.
Code for the Q-Learning presentation at Paris-Saclay @ Dr. Abdel Lisser, Game theory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
import numpy as np | |
from PIL import Image | |
fig = plt.figure() | |
im = Image.open('ball.jpg') | |
L = 15 | |
im = np.asarray(im.resize((im.size[0]//L, im.size[1]//L))).astype(np.float)/ 255 | |
CENTER = fig.bbox.xmax//2, fig.bbox.ymax//4 | |
DX,DY = -190,280#fig.bbox.xmax//10, fig.bbox.ymax //5 | |
BALLX = { | |
0:CENTER[0] + 0*DX, | |
1:CENTER[0] + 1 * DX, | |
2:CENTER[0] + 2 * DX, | |
3:CENTER[0] + 3 * DX, | |
} | |
BALLY = { | |
0:CENTER[1] + 0*DY, | |
1:CENTER[1] + 1 * DY, | |
2:CENTER[1] + 2 * DY, | |
3:CENTER[1] + 3 * DY, | |
4:CENTER[1] + 4 * DY, | |
} | |
class Soccer: | |
def __init___(self) -> None: | |
return | |
def build(self) -> None: | |
""" | |
This only exists because the class | |
constructor is not being called, sigh | |
""" | |
self.counter = 1 | |
self.build_field() | |
self.build_players() | |
return | |
def build_field(self) -> None: | |
self.LR = 0 | |
self.factor = 0.9995 # TIME COST | |
self.current_cost = [self.factor].copy()[0] | |
self.X = 4 | |
self.Y = 5 | |
self.field = np.zeros((self.X,self.Y)) | |
self.agent_initial_position = (1,1) | |
self.opponent_initial_position = (2,3) | |
self.agent_goal = ((1,0),(2,0)) | |
self.opponent_goal = ((1,4),(2,4)) | |
self.agent_goal_value = -1 | |
self.opponent_goal_value = 1 | |
self.history = [[],[],{}] | |
self.laps = 0 | |
self.DRAW_LIMIT = 40 | |
for x in self.agent_goal: | |
self.field[x[0],x[1]] = self.agent_goal_value | |
for x in self.opponent_goal: | |
self.field[x[0],x[1]] = self.opponent_goal_value | |
return None | |
def update_field(self) -> None: | |
self.field = np.zeros((self.X,self.Y)) | |
for x in self.agent_goal: | |
self.field[x[0],x[1]] = self.agent_goal_value | |
for x in self.opponent_goal: | |
self.field[x[0],x[1]] = self.opponent_goal_value | |
self.field[self.agent[0],self.agent[1]] = self.agent_value | |
self.field[self.opponent[0],self.opponent[1]] = self.opponent_value | |
def reset_initial_positions(self, has_agent=True, soft=False) -> None: | |
if soft: | |
if has_agent: | |
self.future_agent = [self.agent_initial_position[0],self.agent_initial_position[1],True] | |
self.future_opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False] | |
else: | |
self.future_agent = [self.agent_initial_position[0],self.agent_initial_position[1],False] | |
self.future_opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],True] | |
self.opponent_local_reward = self.LR | |
self.agent_local_reward = self.LR | |
return | |
if has_agent: | |
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],True] | |
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False] | |
else: | |
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],False] | |
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],True] | |
self.opponent_local_reward = self.LR | |
self.agent_local_reward = self.LR | |
return | |
def build_players(self) -> None: | |
# Actions | |
self.actions = ["E","W","N","S"] | |
self.agent_action = "E" | |
self.opponent_action = "E" | |
# Rewards/Costs | |
self.agent_cost = 0 | |
self.opponent_cost = 0 | |
self.agent_local_reward=self.LR | |
self.opponent_local_reward=self.LR | |
self.history[2]["wins"] = [0] | |
self.history[2]["losses"] = [0] | |
self.history[2]["draws"] = [0] | |
# Position and ball possession | |
self.agent = [self.agent_initial_position[0],self.agent_initial_position[1],True] | |
self.ball = [self.agent[0], self.agent[1]] | |
self.opponent = [self.opponent_initial_position[0],self.opponent_initial_position[1],False] | |
# Rendering values | |
self.agent_value = 2 | |
self.opponent_value = -2 | |
self.field[self.agent[0],self.agent[1]] = self.agent_value | |
self.field[self.opponent[0],self.opponent[1]] = self.opponent_value | |
# Q-learning | |
self.alpha = 1 | |
self.alpha_diminishment_factor = 1#0.999999999999 # ALPHA DIMINISHER | |
self.history[2]["std_Q"] = [] | |
self.deterministic = False | |
self.Qagent = np.asarray([np.random.rand(*self.field.shape)*0.1 for _ in self.actions] + [np.random.rand(*self.field.shape)*0.01 for _ in self.actions]) | |
self.agent_policies = [np.random.choice(self.actions) for _ in range(self.X * self.Y * 2)] | |
def apply_environment(self) -> None: | |
""" | |
Is a movement accepted or not? | |
""" | |
self.laps += 1 | |
self.future_agent = self.agent.copy() | |
self.future_opponent = self.opponent.copy() | |
# Do we fall out of the grid? | |
answer_agent,new_pos_agent = check_movement(self.agent_action, self.agent[:2], (self.X,self.Y)) | |
answer_opponent,new_pos_opponent = check_movement(self.opponent_action, self.opponent[:2], (self.X,self.Y)) | |
# Do we bump against the other player? | |
if answer_agent: | |
if answer_opponent: | |
if tuple(new_pos_agent) != tuple(new_pos_opponent): | |
self.future_agent[:2] = new_pos_agent.copy() | |
elif tuple(new_pos_agent) != tuple(self.opponent[:2]): | |
self.future_agent[:2] = new_pos_agent.copy() | |
if answer_opponent: | |
if answer_agent: | |
if tuple(new_pos_opponent) != tuple(new_pos_agent): | |
self.future_opponent[:2] = new_pos_opponent.copy() | |
elif tuple(new_pos_opponent) != tuple(self.agent[:2]): | |
self.future_opponent[:2] = new_pos_opponent.copy() | |
# Does the ball change posession? | |
self.ball_posession_constraint() | |
def display(self, save=False, name=''): | |
global im | |
fig = plt.figure(figsize=(6.4, 4.8), dpi=300) | |
self.update_field() | |
plt.imshow(self.field) | |
plt.title(f'Agent Reward: {round(self.agent_cost,3)} | Opponent Reward: {round(self.opponent_cost,3)} | iteration: {self.counter}') | |
if self.agent[2]: | |
fig.figimage(im, BALLY[self.agent[1]], BALLX[self.agent[0]]) | |
else: | |
fig.figimage(im, BALLY[self.opponent[1]], BALLX[self.opponent[0]]) | |
if save: | |
if name: | |
fig.savefig(name) | |
else: | |
fig.savefig('result.png') | |
else: | |
plt.show() | |
plt.close('all') | |
def time_evolver(self) -> None: | |
self.current_cost *= self.factor | |
self.counter += 1 | |
self.opponent = self.future_opponent.copy() | |
self.agent = self.future_agent.copy() | |
if self.laps == self.DRAW_LIMIT: | |
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0] | |
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0] | |
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 1] | |
MATCH.reset_initial_positions(np.random.choice([True,False])) | |
self.laps = 0 | |
return | |
return | |
def record_reward(self) -> None: | |
self.history[0] += [self.agent_cost].copy() | |
self.history[1] += [self.opponent_cost].copy() | |
return | |
def update_reward(self) -> None: | |
reward_agent = self.LR | |
reward_opponent = self.LR | |
if self.future_agent[2] and tuple(self.future_agent[:2]) in self.opponent_goal: | |
reward_agent += self.current_cost | |
reward_opponent -= self.current_cost | |
self.opponent_local_reward = -self.current_cost | |
self.agent_local_reward = self.current_cost | |
self.reset_initial_positions(has_agent=False, soft=True) | |
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 1] | |
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0] | |
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0] | |
elif self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.agent_goal: | |
reward_opponent += self.current_cost | |
reward_agent -= self.current_cost | |
self.opponent_local_reward = self.current_cost | |
self.agent_local_reward = -self.current_cost | |
self.reset_initial_positions(has_agent=True, soft=True) | |
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0] | |
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 1] | |
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0] | |
elif self.future_agent[2] and tuple(self.future_agent[:2]) in self.agent_goal: | |
reward_agent -= self.current_cost | |
reward_opponent += self.current_cost | |
self.opponent_local_reward = self.current_cost | |
self.agent_local_reward = -self.current_cost | |
self.reset_initial_positions(has_agent=True, soft=True) | |
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0] | |
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 1] | |
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0] | |
elif self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.opponent_goal: | |
reward_agent += self.current_cost | |
reward_opponent -= self.current_cost | |
self.opponent_local_reward = -self.current_cost | |
self.agent_local_reward = self.current_cost | |
self.reset_initial_positions(has_agent=False, soft=True) | |
self.history[2]["wins"] += [self.history[2]["wins"][-1] + 1] | |
self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0] | |
self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0] | |
else: pass | |
#self.history[2]["wins"] += [self.history[2]["wins"][-1] + 0] | |
#self.history[2]["losses"] += [self.history[2]["losses"][-1] + 0] | |
#self.history[2]["draws"] += [self.history[2]["draws"][-1] + 0] | |
self.agent_cost += reward_agent | |
self.opponent_cost += reward_opponent | |
def ball_posession_constraint(self) -> None: | |
""" | |
If they are in front of each other then the posession changes | |
""" | |
# If they are inside the goal then this is a goal, it cant be prevented | |
if ((self.future_agent[2] and tuple(self.future_agent[:2]) in self.opponent_goal) or | |
(self.future_opponent[2] and tuple(self.future_opponent[:2]) in self.agent_goal)): | |
return | |
if abs(self.future_agent[0]-self.future_opponent[0]) + abs(self.future_agent[1] - self.future_opponent[1]) == 1: | |
self.future_agent[2] = not self.future_agent[2] | |
self.future_opponent[2] = not self.future_opponent[2] | |
return | |
def generate_random_actions(self, T="single") -> None: | |
if T=="single": # Agent: random | Opponent: still | |
self.agent_action = np.random.choice(self.actions) | |
elif T=="both": # Agent: random | Opponent: random | |
self.opponent_action = np.random.choice(self.actions) | |
self.agent_action = np.random.choice(self.actions) | |
elif T=="singleVSsmart": # Agent: smart ad-hoc (w randomness) | Opponent: random | |
self.agent_action = adhoc_random_opponent(self.agent[:2],self.agent[2], self.Y) | |
self.opponent_action = np.random.choice(self.actions) | |
elif T=="singleVSQlearning" or T=="singleVSballAwareQlearning": # Agent: Q-learning | Opponent: random | |
if T=="singleVSballAwareQlearning": self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)] | |
else: self.agent_action = self.agent_policies[self.agent[0] * self.agent[1]] | |
self.opponent_action = np.random.choice(self.actions) | |
elif T=="smartVSQlearning" or T=="smartVSballAwareQlearning": # Agent: Q-learning | Opponent: smart (deterministic) | |
if T=="smartVSballAwareQlearning": self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)] | |
else: self.agent_action = self.agent_policies[self.agent[0] * self.agent[1]] | |
self.opponent_action = adhoc_deterministic_opponent(self.opponent[:2],self.opponent[2], self.Y) | |
elif T=="noisysmartVSballAwareQlearning": # Agent: Q-learning | Opponent: smart (w randomness) | |
self.agent_action = self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * (self.X * self.Y)] | |
self.opponent_action = adhoc_random_opponent(self.opponent[:2],self.opponent[2], self.Y) | |
return | |
def learn(self, T="single", verbose=False) -> None: | |
if T=="singleVSQlearning" or T=="smartVSQlearning": | |
if verbose: print(f'Action was: {self.agent_policies[self.agent[0] * self.agent[1]]}') | |
maxQ = max([self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))]) | |
self.Qagent[self.actions.index(self.agent_action)][self.agent[0],self.agent[1]] = ( | |
(1-self.alpha) * self.Qagent[self.actions.index(self.agent_action)][self.agent[0],self.agent[1]] + | |
self.alpha * ( self.agent_cost + self.factor * maxQ ) # agent_cost instead of agent_local_reward | |
) | |
localActionCost = [self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))] | |
self.history[2]["std_Q"] += [np.diff(np.sort(localActionCost)[-2:])[0]].copy() | |
if self.history[2]["std_Q"][-1] == 0: | |
maxQ = max([self.Qagent[i][self.agent[0],self.agent[1]] for i in range(len(self.actions))]) | |
aux = [self.actions[i] for i,x in enumerate(localActionCost) if x==maxQ] | |
self.agent_policies[self.agent[0] * self.agent[1]] = np.random.choice(aux) | |
else: | |
self.agent_policies[self.agent[0] * self.agent[1]] = self.actions[np.argmax(localActionCost)] | |
self.alpha *= self.alpha_diminishment_factor | |
if T=="singleVSballAwareQlearning" or T=="smartVSballAwareQlearning" or "noisysmartVSballAwareQlearning": | |
maxQ = max([self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))]) | |
self.Qagent[self.actions.index(self.agent_action) + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] = ( | |
(1-self.alpha) * self.Qagent[self.actions.index(self.agent_action) + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] + | |
self.alpha * ( self.agent_cost + self.factor * maxQ ) # agent_cost instead of agent_local_reward | |
) | |
localActionCost = [self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))] | |
self.history[2]["std_Q"] += [np.diff(np.sort(localActionCost)[-2:])[0]].copy() | |
if self.history[2]["std_Q"][-1] == 0: | |
maxQ = max([self.Qagent[i + int(self.agent[2]) * len(self.actions)][self.agent[0],self.agent[1]] for i in range(len(self.actions))]) | |
aux = [self.actions[i] for i,x in enumerate(localActionCost) if x==maxQ] | |
#print(f'maxQ is {maxQ} and aux is {aux} and localactioncost is {localActionCost}') | |
self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y] = np.random.choice(aux) | |
else: | |
self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y] = self.actions[np.argmax(localActionCost)] | |
#print(f'Action is: {self.agent_policies[self.agent[0] * self.agent[1] + int(self.agent[2]) * self.X*self.Y]}\n\n') | |
self.alpha *= self.alpha_diminishment_factor | |
return | |
def adhoc_random_opponent(XY, ballPossession, Lim) -> str: | |
if np.random.rand()<0.05 or (XY[1]==0 or XY[1]==Lim): | |
if np.random.rand()<0.5: return "E" | |
else: return "W" | |
if ballPossession: | |
return "N" | |
else: | |
return "S" | |
def adhoc_deterministic_opponent(XY, ballPossession, Lim) -> str: | |
""" | |
Opponent moves to the left of the screen if it has the ball | |
else to the right | |
""" | |
if not ballPossession: | |
return "N" | |
return "S" | |
def can_move_down(pos, XY): | |
""" | |
Decide if the movement is allowed | |
""" | |
Y = -1 | |
if pos[1] + Y >= XY[1] or pos[1] + Y < 0: | |
return False | |
return True | |
def can_move_up(pos, XY): | |
""" | |
Decide if the movement is allowed | |
""" | |
Y = 1 | |
if pos[1] + Y >= XY[1] or pos[1] + Y < 0: | |
return False | |
return True | |
def can_move_right(pos, XY): | |
""" | |
Decide if the movement is allowed | |
""" | |
X = 1 | |
if pos[0] + X >= XY[0] or pos[0] + X < 0: | |
return False | |
return True | |
def can_move_left(pos, XY): | |
""" | |
Decide if the movement is allowed | |
""" | |
X = -1 | |
if pos[0] + X >= XY[0] or pos[0] + X < 0: | |
return False | |
return True | |
def check_movement(action, pos,XY): | |
if action == "E": | |
return can_move_right(pos,XY), [pos[0]+1,pos[1]] | |
elif action == "W": | |
return can_move_left(pos,XY), [pos[0]-1,pos[1]] | |
elif action == "N": | |
return can_move_up(pos,XY), [pos[0],pos[1]+1] | |
else: | |
return can_move_down(pos,XY), [pos[0],pos[1]-1] | |
return False,[pos[0],pos[1]] | |
MATCH = Soccer() | |
MATCH.build() | |
LIMIT = 100000 | |
# 0 1 2 3 4 5 6 | |
TYPE = ["single", "both", "singleVSsmart", "singleVSQlearning","smartVSQlearning","smartVSballAwareQlearning","singleVSballAwareQlearning", | |
# 7 | |
"noisysmartVSballAwareQlearning"][5] | |
counter = 0 | |
VIDEO = [False,True][0] #V | |
VIDEO_ONLY_AFTER = [False,True][0] #V | |
FRAMES = 15 | |
AFTER = LIMIT-2 * FRAMES | |
DISPLAY = [False,True][0] #D | |
if VIDEO: #V | |
import os #V | |
os.system('rm soccer*.png') #V | |
if VIDEO: #V | |
if not VIDEO_ONLY_AFTER: | |
MATCH.display(True, 'soccer'+str(counter).zfill(3)+'.png') #V | |
counter += 1 #V | |
elif DISPLAY: | |
MATCH.display() #D | |
counter += 1 #D | |
while True: | |
if counter==AFTER and VIDEO_ONLY_AFTER: MATCH.reset_initial_positions(True) | |
if counter==AFTER+FRAMES and VIDEO_ONLY_AFTER: MATCH.reset_initial_positions(False) | |
MATCH.generate_random_actions(T=TYPE) | |
MATCH.apply_environment() | |
MATCH.update_reward() | |
MATCH.record_reward() | |
if counter >= AFTER: VERB = True | |
else: VERB=False | |
MATCH.learn(T=TYPE, verbose=VERB) | |
MATCH.time_evolver() | |
# Display Module | |
if VIDEO: #V | |
if not VIDEO_ONLY_AFTER: | |
MATCH.display(True, 'soccer'+str(counter).zfill(3)+'.png') #V | |
elif counter >= AFTER: | |
MATCH.display(True, 'soccer'+str(counter-AFTER).zfill(3)+'.png') #V | |
elif DISPLAY: #D | |
MATCH.display() #D | |
counter += 1 | |
if counter == LIMIT: | |
break | |
if (not VIDEO and not DISPLAY) or VIDEO_ONLY_AFTER: | |
fig, ax = plt.subplots(figsize=(15,5)) | |
ax.plot(range(1,LIMIT+1),MATCH.history[0],c='r',label='Reward Agent (Q-learning)', lw=5, alpha=0.8) | |
ax.plot(range(1,LIMIT+1),MATCH.history[1],c='b',label='Reward Opponent (random)', lw=5, alpha=0.8) | |
ax.grid() | |
ax.set_xscale('log') | |
ax.legend() | |
plt.show() | |
try: | |
if True: | |
fig, ax = plt.subplots(figsize=(15,5)) | |
SUM = np.asarray([MATCH.history[2]["wins"][i] + MATCH.history[2]["losses"][i] + MATCH.history[2]["draws"][i] for i in range(len(MATCH.history[2]["wins"]))]) | |
SUM = np.where(SUM>0,SUM,1) | |
ax.plot(np.asarray(MATCH.history[2]["wins"]) / SUM,c='k',label='wins', lw=3, alpha=0.8) | |
ax.plot(np.asarray(MATCH.history[2]["losses"]) / SUM,c='g',label='losses', lw=3, alpha=0.8) | |
ax.plot(np.asarray(MATCH.history[2]["draws"]) / SUM,c='b',label='draws', lw=3, alpha=0.8) | |
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["wins"]) / SUM,c='k', lw=3, alpha=0.8) | |
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["losses"]) / SUM,c='g', lw=3, alpha=0.8) | |
ax.scatter(range(len(SUM)),np.asarray(MATCH.history[2]["draws"]) / SUM,c='b', lw=3, alpha=0.8) | |
ax.grid() | |
ax.set_xscale('log') | |
ax.legend() | |
plt.show() | |
except: pass | |
try: | |
if (not VIDEO and not DISPLAY) or VIDEO_ONLY_AFTER: | |
fig, ax = plt.subplots(figsize=(15,5)) | |
ax.plot(range(1,LIMIT+1),MATCH.history[2]['std_Q'],c='g',label='max(Q(s,a))-2nd_max(Q(s,a))', lw=3, alpha=0.8) | |
#ax.plot(range(1,LIMIT+1),[MATCH.alpha_diminishment_factor**i for i in range(1,LIMIT+1)],c='k',label='Alpha (Q-learning)', lw=3, alpha=0.8) | |
ax.grid() | |
ax.set_xscale('log') | |
ax.legend() | |
plt.show() | |
except: pass | |
if VIDEO: #V | |
try:os.remove("out.mp4") | |
except:pass | |
os.system("ffmpeg -framerate 3 -pattern_type glob -i 'soccer*.png' -c:v libx264 -pix_fmt yuv420p out.mp4") | |
if TYPE=="singleVSQlearning": | |
np.save(f'{TYPE}-{LIMIT}-alphadim:{MATCH.alpha_diminishment_factor}.npy', np.asarray(MATCH.history)) | |
else: | |
np.save(f'{TYPE}-{LIMIT}.npy', np.asarray(MATCH.history[:2])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment