Exploration of Reinforcement Learning (RL), a pivotal branch of machine learning. Core principles of RL, training intelligent agents, teaching them to make strategic decisions and maximize rewards. Agents will learn to navigate a whole host of different environments from OpenAI's gym toolkit, including navigating frozen lakes and mountains.
Presented by Fouad Trad, Machine Learning Engineer
Foundational concepts, roles, and applications. RL framework, agent-environment interaction. Use the Gymnasium library to create environments, visualize states, and perform actions - practical foundation in RL concepts and applications.
- Agent learns through trial and error
- Agent receives
rewards
for good decisions andpenalties
for bad decisions - Goal is to
maximize positive feedback
over time
- Agent receives
- RL vs. other ML types
- Supervised Learning
- Unsupervised Learning
- When to use RL?
- Sequential decision-making, decisions influence future observations
- Learning through rewards and penalties, no direct supervision
- RL applications - robotics, financing, autonomous vehicles
- Episodic vs. continuous tasks
- Navigating the RL framework
Gymnasium
- Standard library for RL tasks, abstracts complexity of RL problems, provides a plethora of RL environments
Agent
: learner, decision-makerEnvironment
: challenges to be solvedState
: environment snapshot at given timeAction
: agent's choice in response to stateReward
: feedback for agent action- Actions have long term consequences
- Agent aims to maximize total reward over time
- Return: sum of all expected rewards
- Discounted return - gives more weight to nearer rewards
- Discount factor (γ): discounts future rewards
env = create_environment()
state = env.get_initial_state()
for i in range(n_iterations):
action = choose_action(state)
state, reward = env.execute(action)
update_knowledge(state, action, reward)
expected_rewards = np.array([1, 6, 3])
discount_factor = 0.9
discounts = np.array([discount_factor ** i for i in range(len(expected_rewards))])
discounted_return = np.sum(expected_rewards * discounts)
- Key Gymnasium environments
CartPole
: Agent must balance a pole on moving cartMountainCar
: Agent must drive a car up a steep hillFrozenLake
: Agent must navigate a frozen lake with holesTaxi
: Picking up and dropping off passengers
import gymnasium as gym
env = gym.make('CartPole', render_mode='rgb_array')
state, info = env.reset(seed=42)
print(state)
import matplotlib.pyplot as plt
def render():
state_image = env.render()
plt.imshow(state_image)
plt.show()
render()
# 0: moving left, 1: moving right
action = 1
while not terminated:
action = 1 # Move to the right
state, reward, terminated, truncated, info = env.step(action)
render()
Markov Decision Processes (MDPs) and their essential components. Policies and value functions. Policy optimization with policy iteration and value Iteration techniques.
- Markov property (MDP) - future state depends only on current state and action
- Frozen Lake as MDP
- Agent must reach goal without falling into holes
- State, actions, terminal state, transitions
- Transition probabilities: likelihood of reaching a state given a state and action
- Reward only given in goal state
import gymnasium as gym
env = gym.make('FrozenLake', is_slippery=True)
print(env.action_space)
print(env.observation_space)
print("Number of actions: ", env.action_space.n)
print("Number of states: ", env.observation_space.n)
# env.unwrapped.P : dictionary where keys are state-action pairs
print(env.unwrapped.P[state][action])
- Policies specify which action to take in each state to maximize return
- State-value functions
- Estimate the state's worth
- Expected return starting from state, following policy
- Sum of discounted rewards, starting at state
s
and folowing the policy Grid World
- Nine states - nine state-values
- Discount factor: γ = 1
- Value of goal state
- Starting in goal state, agent doesn't move
V(goal state) = 0
Bellman Equation
computes state values recursivelyV(s) = R(s+1) + gamma * V(s+1)
# Grid world example: policy
# 0: left, 1: down, 2: right, 3: up
policy = {
0:1, 1:2, 2:1,
3:1, 4:3, 5:1,
6:2, 7:3
}
state, info = env.reset()
terminated = False
while not terminated:
action = policy[state]
state, reward, terminated, _, _ = env.step(action)
def compute_state_value(state):
if state == terminal_state:
return 0
action = policy[state]
_, next_state, reward, _ = env.unwrapped.P[state][action][0]
return reward + gamma * compute_state_value(next_state)
terminal_state = 8
gamma = 1
V = { state: compute_state_value(state) for state in range(num_states) }
- Action-value functions
(Q-values)
- Expected return of starting at a state
s
, taking actiona
, following the policy Q(s,a) = R(a) + gamma * V(s+1)
- Expected return of starting at a state
- Improving the policy
def compute_q_value(state, action):
if state == terminal_state:
return None
_, next_state, reward, _ = env.unwrapped.P[state][action][0]
return reward + gamma * compute_state_value(next_state)
Q = { (state, action): compute_q_value(state, action)
for state in range(num_states)
for action in range(num_actions)
}
improved_policy = {}
for state in range(num_states-1):
max_action = max(range(num_actions), key=lambda action: Q[(state, action)])
improved_policy[state] = max_action
- Policy iteration and value iteration
- Iterative process to find optimal policy
- Initialize, evaluate, improve, repeat
- Until policy stops changing (improving)
- Result is the optimal policy
def policy_evaluation(policy):
V = {state: compute_state_value(state, policy) for state in range(num_states)}
return V
def policy_improvement(policy):
# initialize
improved_policy = {s:0 for s in range(num_states-1)}
Q = {
(state, action): compute_q_value(state, action, policy)
for state in range(num_states)
for action in range(num_actions)
}
for state in range(num_states-1):
max_action = max(range(num_actions), key=lambda action: Q[(state, action)])
improved_policy[state] = max_action
return improved_policy
# policy_iteration
policy = {0:1, 1:2, 2:1, 3:1, 4:3, 5:1, 6:2, 7:3}
while True:
V = policy_evaluation(policy)
improved_policy = policy_improvement(policy)
if improved_policy == policy:
break
policy = improved_policy
- Value Iteration combines policy evaluation and improvement in one step
- Computes optimal state-value function
- Derives policy from it
- Repeats until the policy improvement dops below some threshold
def compute_q_value(state, action, V):
if state == terminal_state:
return None
_, next_state, reward, _ = env.P[state][action][0]
return reward + gamma * V[next_state]
def get_max_action_and_value(state, V):
Q_values = [
compute_q_value(state, action, V) for action in range(num_actions)
]
max_action = max(range(num_actions), key=lambda a: Q_values[a])
max_q_value = Q_values[max_action]
return max_action, max_q_value
# value iteration
V = {state: 0 for state in range(num_states)}
policy = {state:0 for state in range(num_states-1)}
threshold = 0.001
while True:
new_V = {state: 0 for state in range(num_states)}
for state in range(num_states-1):
max_action, max_q_value = get_max_action_and_value(state, V)
new_V[state] = max_q_value
policy[state] = max_action
if all(abs(new_V[state] - V[state]) < thresh for state in V):
break
V = new_V
A journey through the dynamic realm of Model-Free Learning in RL. Foundational first-visit
and every-visit
Monte Carlo prediction algorithms. Temporal Difference Learning, SARSA algorithm, Q-Learning. Analyze its convergence in challenging environments.
- Recap of
model-based learning
- Rely on knowledge of environment dynamics
- No interaction with environment
- Model-free learning
- Doesn't rely on knowledge of environment dynamics
- Agent interacts with environment
- Learns policy through trial and error
- More suitable for real-world applications
- Monte Carlo methods
- Model-free techniques
- Estimate Q-values based on episodes
- Two methods:
first-visit
,every-visit
def generate_episode():
episode = []
state, info = env.reset()
terminated = False
while not terminated:
action = env.action_space.sample()
next_state, reward, terminated, truncated, info = env.step(action)
episode.append((state, action, reward))
state = next_state
return episode
def first_visit_mc(num_episodes):
Q = np.zeros((num_states, num_actions))
returns_sum = np.zeros((num_states, num_actions))
returns_count = np.zeros((num_states, num_actions))
for i in range(num_episodes):
episode = generate_episode()
visited_states_actions = set()
for j, (state, action, reward) in enumerate(episode):
if (state, action) not in visited_states:
returns_sum[state, action] += sum([x[2] for x in episode[j:]])
returns_count[state, action] += 1
visited_states_actions.add((state, action))
nonzero_counts = returns_count != 0
Q[nonzero_counts] = returns_sum[nonzero_counts] / returns_count[nonzero_counts]
return Q
def every_visit_mc(num_episodes):
# tbc
Q = first_visit_mc(1000)
print("First-visit policy: \n", get_policy())
- Temporal difference learning
- Monte Carlo
- Model-free
- Estimate Q-table based on interaction
- Update Q-table when at least one episode done
- Suitable for short episodic tasks
- TD learning
- Model-free
- Estimate Q-table based on interaction
- Update Q-table each step within episode
- Suitable for tasks with long/indefinite episodes
- Monte Carlo
SARSA
- TD algorithm
- On-policy method: adjusts strategy based on taken actions
Q(state, action) = (1 - alpha) * Q(state, action) + alpha * ( reward + gamma * Q( next-state, next-action )
alpha
- learning rate,gamma
- discount factor
for episode in range(num_episodes):
state, info = env.reset()
action = env.action_space.sample()
terminated = False
while not terminated:
next_state, reward, terminated, truncated, info = env.step(action)
next_action = env.action_space.sample()
update_q_table(state, action, reward, next_state, next_action)
state, action = next_state, next_action
def update_q_table(state, action, reward, next_state, next_action):
old_value = Q[state, action]
next_value = Q[next_state, next_action]
Q[state, action] = (1 - alpha) * old_value + alpha * (reward + gamma * next_value)
- Introduction to
Q-learning
- Stands for quality learning
- Model-free technique
- Learns optimal Q-table by interaction
Q-learning
vs.SARSA
SARSA
updates based on taken action - on-policy learnerQ-learning
updates independent of taken actions - off-policy learner- Maximum Q value based on next state
def update_q_table(state, action, reward, new_state):
old_value = Q[state, action]
next_max = max(Q[new_state])
Q[state, action] = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
env = gym.make("FrozenLake", is_slippery=True)
num_episodes = 1000
alpha = 0.1
gamma = 1
num_states, num_actions = env.observation_space.n, env.action_space.n
Q = np.zeros((num_states, num_actions))
reward_per_random_episode = []
for episode in range(num_episodes):
state, info = env.reset()
terminated = False
episode_reward = 0
while not terminated:
# Random action selection
action = env.action_space.sample()
# Take action and observe new state and reward
new_state, reward, terminated, truncated, info = env.step(action)
# Update Q-table
update_q_table(state, action, new_state)
episode_reward += reward
state = new_state
reward_per_random_episode.append(episode_reward)
reward_per_learned_episode = []
policy = get_policy()
for episode in range(num_episodes):
state, info = env.reset()
terminated = False
episode_reward = 0
while not terminated:
# Select the best action based on learned Q-table
action = policy[state]
# Take action and observe new state
new_state, reward, terminated, truncated, info = env.step(action)
state = new_state
episode_reward += reward
reward_per_learned_episode.append(episode_reward)
# Q-learning evaluation
import numpy as np
import matplotlib.pyplot as plt
avg_random_reward = np.mean(reward_per_random_episode)
avg_learned_reward = np.mean(reward_per_learned_episode)
plt.bar(['Random Policy', 'Learned Policy'], [avg_random_reward, avg_learned_reward], color=['blue', 'green'])
plt.title('Average Reward per Episode')
plt.ylabel('Average Reward')
plt.show()
- Expected SARSA
- Updates Q-table differently than SARSA and Q-learning
Q(state, action) = (1 - alpha) * Q(state, action) + alpha * ( reward + gamma * Expected Q( next-state, next-action )
- Takes into account all actions
Expected Q( next-state, next-action ) = Sum( Prob(action) * Q( next-state, next-action ) for action in A )
- Random actions => equal probabilities
Expected Q( next-state, next-action ) = Mean( Q( next-state, next-action ) for action in A )
env = gym.make('FrozenLake-v1', is_slippery=False)
num_states = env.observation_space.n
num_actions = env.action_space.n
Q = np.zeros((num_states, num_actions))
gamma = 0.99
alpha = 0.1
num_episodes = 1000
def update_q_table(state, action, next_state, reward):
expected_q = np.mean(Q[next_state])
Q[state, action] = (1-alpha) * Q[state, action] + alpha * (reward + gamma * expected_q)
# training
for i in range(num_episodes):
state, info = env.reset()
terminated = False
while not terminated:
action = env.action_space.sample()
next_state, reward, terminated, truncated, info = env.step(action)
update_q_table(state, action, next_state, reward)
state = next_state
policy = { state: np.argmax(Q[state]) for state in range(num_states)}
print(policy)
- Double Q-learning
- Q-learning
- Estimates optimal action-value function
- Overestimates Q-values by updating based on max Q
- Might lead to suboptimal policy learning
- Double Q-learning
- Maintains two Q-tables and alernates between updates
- Each table updated based on the other, both contribute to learning process
- Reduces risk of Q-values overestimation
- Q-learning
env = gym.make('FrozenLake-v1', is_slippery=False)
num_states = env.observation_space.n
n_actions = env.action_space.n
Q = [np.zeros((num_states, n_actions))] * 2
num_episodes = 1000
alpha = 0.5
gamma = 0.99
def update_q_tables(state, action, reward, next_state):
# Select a random Q-table index (0 or 1)
i = np.random.randint(2)
# Update the corresponding Q-table
best_next_action = np.argmax(Q[i][next_state])
Q[i][state, action] = (1 - alpha) * Q[i][state, action] + alpha * (reward + gamma * Q[1-i][next_state, best_next_action])
# training
for episode in range(num_episodes):
state, info = env.reset()
terminated = False
while not terminated:
action = np.random.choice(n_actions)
next_state, reward, terminated, truncated, info = env.step(action)
update_q_tables(state, action, reward, next_state)
state = next_state
final_Q = (Q[0] + Q[1])/2
# OR
final_Q = Q[0] + Q[1]
# agent's policy
policy = {state: np.argmax(final_Q[state]) for state in range(num_states)}
- Balancing between exploration and exploitation
- Training with random actions
- Agent explores environment
- No strategy optimization based on learned knowledge
- Agent uses knowledge when training done
- Exploration-exploitation trade-off
- Balances exploration and exploitation
- Continuous exploration prevents strategy refinement
- Exclusive exploitation misses undiscovered opportunities
- Epsilon-greedy strategy
- Explore with probability
epsilon
- Exploit with probability
1 - epsilon
- Ensures continuous exploration while using knowledge
- Explore with probability
- Decayed epsilon-greedy strategy
- Reduces epsilon over time
- More exploration initially, more exploitation later on
- Agent increasingly relies on its accumulated knowledge
env = gym.make('FrozenLake', is_slippery=True)
action_size = env.action_space.n
state_size = env.observation_space.n
Q = np.zeros((state_size, action_size))
alpha = 0.1
gamma = 0.99
total_episodes = 10000
def epsilon_greedy(state):
if np.random.rand() < epsilon:
action = env.action_space.sample() # Explore
else:
action = np.argmax(Q[state, :]) # Exploit
return action
# training
epsilon = 0.9 # Exploration rate
rewards_eps_greedy = []
for episode in range(total_episodes):
state, info = env.reset()
terminated = False
episode_reward = 0
while not terminated:
action = epsilon_greedy(state)
new_state, reward, terminated, truncated, info = env.step(action)
Q[state, action] = update_q_table(state, action, new_state)
state = new_state
episode_reward += reward
rewards_eps_greedy.append(episode_reward)
# training decayed epsilon-greedy
epsilon = 1.0 # Exploration rate
epsilon_decay = 0.999
min_epsilon = 0.01
rewards_decay_eps_greedy = []
for episode in range(total_episodes):
state, info = env.reset()
terminated = False
episode_reward = 0
while not terminated:
action = epsilon_greedy(state)
new_state, reward, terminated, truncated, info = env.step(action)
episode_reward += reward
Q[state, action] = update_q_table(state, action, new_state)
state = new_state
rewards_decay_eps_greedy.append(episode_reward)
epsilon = max(min_epsilon, epsilon * epsilon_decay)
# comparing strategies
avg_eps_greedy= np.mean(rewards_eps_greedy)
avg_decay = np.mean(rewards_decay_eps_greedy)
plt.bar(['Epsilon Greedy', 'Decayed Epsilon Greedy'], [avg_eps_greedy, avg_decay], color=['blue', 'green'])
plt.title('Average Reward per Episode')
plt.ylabel('Average Reward')
plt.show()
- Multi-armed bandits
- Gambler facing slot machines
- Challenge - maximize winning
- Solution - exploration-exploitation
- Slot machines
- Reward from an arm is 0 or 1
- Agent's goal is to find the slot machine with maximum reward
- Solving the problem
- Decayed epsilon-greedy
- Epsilon - select random machine
1 - epsilon
- select best machine so far- Epsilon decreases over time
# initialization
n_bandits = 4
true_bandit_probs = np.random.rand(n_bandits)
n_iterations = 100000
epsilon = 1.0
min_epsilon = 0.01
epsilon_decay = 0.999
counts = np.zeros(n_bandits) # How many times each bandit was played
values = np.zeros(n_bandits) # Estimated winning probability of each bandit
rewards = np.zeros(n_iterations) # Reward history
selected_arms = np.zeros(n_iterations, dtype=int) # Arm selection history
# iteration loop
for i in range(n_iterations):
arm = epsilon_greedy()
reward = np.random.rand() < true_bandit_probs[arm]
rewards[i] = reward
selected_arms[i] = arm
counts[arm] += 1
values[arm] += (reward - values[arm]) / counts[arm]
epsilon = max(min_epsilon, epsilon * epsilon_decay)
# analyzing selections
selections_percentage = np.zeros((n_iterations, n_bandits))
for i in range(n_iterations):
selections_percentage[i, selected_arms[i]] = 1
selections_percentage = \
np.cumsum(selections_percentage, axis=0) /
np.arange(1, n_iterations + 1).reshape(-1, 1)
# plotting the results
for arm in range(n_bandits):
plt.plot(selections_percentage[:, arm], label=f'Bandit #{arm+1}')
plt.xscale('log')
plt.title('Bandit Action Choices Over Time')
plt.xlabel('Episode Number')
plt.ylabel('Percentage of Bandit Selections (%)')
plt.legend()
plt.show()
for i, prob in enumerate(true_bandit_probs, 1):
print(f"Bandit #{i} -> {prob:.2f}")