Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save misho-kr/fea597f290cd408662a2b44fc10b022c to your computer and use it in GitHub Desktop.
Save misho-kr/fea597f290cd408662a2b44fc10b022c to your computer and use it in GitHub Desktop.
Summary of "Reinforcement Learning with Gymnasium in Python" from DataCamp.Com

Exploration of Reinforcement Learning (RL), a pivotal branch of machine learning. Core principles of RL, training intelligent agents, teaching them to make strategic decisions and maximize rewards. Agents will learn to navigate a whole host of different environments from OpenAI's gym toolkit, including navigating frozen lakes and mountains.

Presented by Fouad Trad, Machine Learning Engineer

Introduction to Reinforcement Learning

Foundational concepts, roles, and applications. RL framework, agent-environment interaction. Use the Gymnasium library to create environments, visualize states, and perform actions - practical foundation in RL concepts and applications.

  • Agent learns through trial and error
    • Agent receives rewards for good decisions and penalties for bad decisions
    • Goal is to maximize positive feedback over time
  • RL vs. other ML types
    • Supervised Learning
    • Unsupervised Learning
  • When to use RL?
    • Sequential decision-making, decisions influence future observations
    • Learning through rewards and penalties, no direct supervision
  • RL applications - robotics, financing, autonomous vehicles
    • Episodic vs. continuous tasks
  • Navigating the RL framework Gymnasium
    • Standard library for RL tasks, abstracts complexity of RL problems, provides a plethora of RL environments
    • Agent: learner, decision-maker
    • Environment: challenges to be solved
    • State: environment snapshot at given time
    • Action: agent's choice in response to state
    • Reward: feedback for agent action
      • Actions have long term consequences
      • Agent aims to maximize total reward over time
      • Return: sum of all expected rewards
      • Discounted return - gives more weight to nearer rewards
      • Discount factor (γ): discounts future rewards
env = create_environment()
state = env.get_initial_state()

for i in range(n_iterations):
  action = choose_action(state)
  state, reward = env.execute(action)
  update_knowledge(state, action, reward)

expected_rewards = np.array([1, 6, 3])
discount_factor = 0.9

discounts = np.array([discount_factor ** i for i in range(len(expected_rewards))])
discounted_return = np.sum(expected_rewards * discounts)
  • Key Gymnasium environments
    • CartPole: Agent must balance a pole on moving cart
    • MountainCar: Agent must drive a car up a steep hill
    • FrozenLake: Agent must navigate a frozen lake with holes
    • Taxi: Picking up and dropping off passengers
import gymnasium as gym

env = gym.make('CartPole', render_mode='rgb_array')
state, info = env.reset(seed=42)
print(state)

import matplotlib.pyplot as plt

def render():
  state_image = env.render()
  plt.imshow(state_image)
  plt.show()

render()

# 0: moving left, 1: moving right
action = 1

while not terminated:
  action = 1 # Move to the right
  state, reward, terminated, truncated, info = env.step(action)
  render()

Model-Based Learning

Markov Decision Processes (MDPs) and their essential components. Policies and value functions. Policy optimization with policy iteration and value Iteration techniques.

  • Markov property (MDP) - future state depends only on current state and action
  • Frozen Lake as MDP
    • Agent must reach goal without falling into holes
    • State, actions, terminal state, transitions
    • Transition probabilities: likelihood of reaching a state given a state and action
    • Reward only given in goal state
import gymnasium as gym

env = gym.make('FrozenLake', is_slippery=True)

print(env.action_space)
print(env.observation_space)
print("Number of actions: ", env.action_space.n)
print("Number of states: ", env.observation_space.n)

# env.unwrapped.P : dictionary where keys are state-action pairs
print(env.unwrapped.P[state][action])
  • Policies specify which action to take in each state to maximize return
  • State-value functions
    • Estimate the state's worth
    • Expected return starting from state, following policy
    • Sum of discounted rewards, starting at state s and folowing the policy
    • Grid World
      • Nine states - nine state-values
      • Discount factor: γ = 1
    • Value of goal state
      • Starting in goal state, agent doesn't move
      • V(goal state) = 0
    • Bellman Equation computes state values recursively
      • V(s) = R(s+1) + gamma * V(s+1)
# Grid world example: policy
# 0: left, 1: down, 2: right, 3: up
policy = {
  0:1, 1:2, 2:1,
  3:1, 4:3, 5:1,
  6:2, 7:3
}

state, info = env.reset()
terminated = False

while not terminated:
  action = policy[state]
  state, reward, terminated, _, _ = env.step(action)

def compute_state_value(state):
  if state == terminal_state:
    return 0
  
  action = policy[state]
  _, next_state, reward, _ = env.unwrapped.P[state][action][0]
  
  return reward + gamma * compute_state_value(next_state)

terminal_state = 8
gamma = 1

V = { state: compute_state_value(state) for state in range(num_states) }
  • Action-value functions (Q-values)
    • Expected return of starting at a state s, taking action a, following the policy
    • Q(s,a) = R(a) + gamma * V(s+1)
  • Improving the policy
def compute_q_value(state, action):
  if state == terminal_state:
    return None
  
  _, next_state, reward, _ = env.unwrapped.P[state][action][0]
  return reward + gamma * compute_state_value(next_state)
  
Q = { (state, action): compute_q_value(state, action)
        for state in range(num_states)
        for action in range(num_actions)
}

improved_policy = {}

for state in range(num_states-1):
  max_action = max(range(num_actions), key=lambda action: Q[(state, action)])
  improved_policy[state] = max_action
  • Policy iteration and value iteration
    • Iterative process to find optimal policy
    • Initialize, evaluate, improve, repeat
    • Until policy stops changing (improving)
    • Result is the optimal policy
def policy_evaluation(policy):
  V = {state: compute_state_value(state, policy) for state in range(num_states)}
  return V

def policy_improvement(policy):
  # initialize
  improved_policy = {s:0 for s in range(num_states-1)}

  Q = {
    (state, action): compute_q_value(state, action, policy)
      for state in range(num_states)
      for action in range(num_actions)
  }
  
  for state in range(num_states-1):
    max_action = max(range(num_actions), key=lambda action: Q[(state, action)])
    improved_policy[state] = max_action

  return improved_policy
  
# policy_iteration
policy = {0:1, 1:2, 2:1, 3:1, 4:3, 5:1, 6:2, 7:3}

while True:
  V = policy_evaluation(policy)
  improved_policy = policy_improvement(policy)
    
  if improved_policy == policy:
    break
    
  policy = improved_policy
  • Value Iteration combines policy evaluation and improvement in one step
    • Computes optimal state-value function
    • Derives policy from it
    • Repeats until the policy improvement dops below some threshold
def compute_q_value(state, action, V):
  if state == terminal_state:
    return None
  
  _, next_state, reward, _ = env.P[state][action][0]
  return reward + gamma * V[next_state]
  
def get_max_action_and_value(state, V):
  Q_values = [
    compute_q_value(state, action, V) for action in range(num_actions)
  ]
  
  max_action = max(range(num_actions), key=lambda a: Q_values[a])
  max_q_value = Q_values[max_action]
  
  return max_action, max_q_value

# value iteration
V = {state: 0 for state in range(num_states)}
policy = {state:0 for state in range(num_states-1)}
threshold = 0.001

while True:
  new_V = {state: 0 for state in range(num_states)}
  for state in range(num_states-1):
    max_action, max_q_value = get_max_action_and_value(state, V)
    new_V[state] = max_q_value
    policy[state] = max_action

    if all(abs(new_V[state] - V[state]) < thresh for state in V):
      break

  V = new_V

Model-Free Learning

A journey through the dynamic realm of Model-Free Learning in RL. Foundational first-visit and every-visit Monte Carlo prediction algorithms. Temporal Difference Learning, SARSA algorithm, Q-Learning. Analyze its convergence in challenging environments.

  • Recap of model-based learning
    • Rely on knowledge of environment dynamics
    • No interaction with environment
  • Model-free learning
    • Doesn't rely on knowledge of environment dynamics
    • Agent interacts with environment
    • Learns policy through trial and error
    • More suitable for real-world applications
  • Monte Carlo methods
    • Model-free techniques
    • Estimate Q-values based on episodes
    • Two methods: first-visit, every-visit
def generate_episode():
  episode = []
  state, info = env.reset()
  terminated = False

  while not terminated:
    action = env.action_space.sample()
    next_state, reward, terminated, truncated, info = env.step(action)
    episode.append((state, action, reward))
    state = next_state
    
  return episode

def first_visit_mc(num_episodes):
  Q = np.zeros((num_states, num_actions))
  returns_sum = np.zeros((num_states, num_actions))
  returns_count = np.zeros((num_states, num_actions))

  for i in range(num_episodes):
    episode = generate_episode()
    visited_states_actions = set()
    for j, (state, action, reward) in enumerate(episode):
      if (state, action) not in visited_states:
        returns_sum[state, action] += sum([x[2] for x in episode[j:]])
        returns_count[state, action] += 1
        visited_states_actions.add((state, action))
        
  nonzero_counts = returns_count != 0
  Q[nonzero_counts] = returns_sum[nonzero_counts] / returns_count[nonzero_counts]

  return Q

def every_visit_mc(num_episodes):
  # tbc

Q = first_visit_mc(1000)
print("First-visit policy: \n", get_policy())
  • Temporal difference learning
    • Monte Carlo
      • Model-free
      • Estimate Q-table based on interaction
      • Update Q-table when at least one episode done
      • Suitable for short episodic tasks
    • TD learning
      • Model-free
      • Estimate Q-table based on interaction
      • Update Q-table each step within episode
      • Suitable for tasks with long/indefinite episodes
  • SARSA
    • TD algorithm
    • On-policy method: adjusts strategy based on taken actions
    • Q(state, action) = (1 - alpha) * Q(state, action) + alpha * ( reward + gamma * Q( next-state, next-action )
    • alpha - learning rate, gamma - discount factor
for episode in range(num_episodes):
  state, info = env.reset()
  action = env.action_space.sample()
  terminated = False

  while not terminated:
    next_state, reward, terminated, truncated, info = env.step(action)
    next_action = env.action_space.sample()
    update_q_table(state, action, reward, next_state, next_action)
    state, action = next_state, next_action
    
def update_q_table(state, action, reward, next_state, next_action):
  old_value = Q[state, action]
  next_value = Q[next_state, next_action]
  Q[state, action] = (1 - alpha) * old_value + alpha * (reward + gamma * next_value)
  • Introduction to Q-learning
    • Stands for quality learning
    • Model-free technique
    • Learns optimal Q-table by interaction
  • Q-learning vs. SARSA
    • SARSA updates based on taken action - on-policy learner
    • Q-learning updates independent of taken actions - off-policy learner
    • Maximum Q value based on next state
def update_q_table(state, action, reward, new_state):
  old_value = Q[state, action]
  next_max = max(Q[new_state])
  Q[state, action] = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)

env = gym.make("FrozenLake", is_slippery=True)

num_episodes = 1000
alpha = 0.1
gamma = 1

num_states, num_actions = env.observation_space.n, env.action_space.n
Q = np.zeros((num_states, num_actions))

reward_per_random_episode = []
for episode in range(num_episodes):
  state, info = env.reset()
  terminated = False
  episode_reward = 0

  while not terminated:
    # Random action selection
    action = env.action_space.sample()
    
    # Take action and observe new state and reward
    new_state, reward, terminated, truncated, info = env.step(action)

    # Update Q-table
    update_q_table(state, action, new_state)
    episode_reward += reward
    state = new_state

  reward_per_random_episode.append(episode_reward)
  
reward_per_learned_episode = []
policy = get_policy()

for episode in range(num_episodes):
  state, info = env.reset()
  terminated = False
  episode_reward = 0

  while not terminated:
    # Select the best action based on learned Q-table
    action = policy[state]

    # Take action and observe new state
    new_state, reward, terminated, truncated, info = env.step(action)
    state = new_state
    episode_reward += reward
    
  reward_per_learned_episode.append(episode_reward)
  
# Q-learning evaluation
import numpy as np
import matplotlib.pyplot as plt

avg_random_reward = np.mean(reward_per_random_episode)
avg_learned_reward = np.mean(reward_per_learned_episode)

plt.bar(['Random Policy', 'Learned Policy'], [avg_random_reward, avg_learned_reward], color=['blue', 'green'])
plt.title('Average Reward per Episode')
plt.ylabel('Average Reward')
plt.show()

Advanced Strategies in Model-Free RL

  • Expected SARSA
    • Updates Q-table differently than SARSA and Q-learning
    • Q(state, action) = (1 - alpha) * Q(state, action) + alpha * ( reward + gamma * Expected Q( next-state, next-action )
    • Takes into account all actions
      • Expected Q( next-state, next-action ) = Sum( Prob(action) * Q( next-state, next-action ) for action in A )
    • Random actions => equal probabilities
      • Expected Q( next-state, next-action ) = Mean( Q( next-state, next-action ) for action in A )
env = gym.make('FrozenLake-v1', is_slippery=False)

num_states = env.observation_space.n
num_actions = env.action_space.n
Q = np.zeros((num_states, num_actions))

gamma = 0.99
alpha = 0.1
num_episodes = 1000

def update_q_table(state, action, next_state, reward):
  expected_q = np.mean(Q[next_state])
  Q[state, action] = (1-alpha) * Q[state, action] + alpha * (reward + gamma * expected_q)

# training
for i in range(num_episodes):
  state, info = env.reset()
  terminated = False
  while not terminated:
    action = env.action_space.sample()
    next_state, reward, terminated, truncated, info = env.step(action)
    update_q_table(state, action, next_state, reward)
    state = next_state

policy = { state: np.argmax(Q[state]) for state in range(num_states)}
print(policy)
  • Double Q-learning
    • Q-learning
      • Estimates optimal action-value function
      • Overestimates Q-values by updating based on max Q
      • Might lead to suboptimal policy learning
    • Double Q-learning
      • Maintains two Q-tables and alernates between updates
      • Each table updated based on the other, both contribute to learning process
      • Reduces risk of Q-values overestimation
env = gym.make('FrozenLake-v1', is_slippery=False)

num_states = env.observation_space.n
n_actions = env.action_space.n

Q = [np.zeros((num_states, n_actions))] * 2
num_episodes = 1000
alpha = 0.5
gamma = 0.99

def update_q_tables(state, action, reward, next_state):
  # Select a random Q-table index (0 or 1)
  i = np.random.randint(2)
  # Update the corresponding Q-table
  best_next_action = np.argmax(Q[i][next_state])
  Q[i][state, action] = (1 - alpha) * Q[i][state, action] + alpha * (reward + gamma * Q[1-i][next_state, best_next_action])

# training
for episode in range(num_episodes):
  state, info = env.reset()
  terminated = False

  while not terminated:
    action = np.random.choice(n_actions)
    next_state, reward, terminated, truncated, info = env.step(action)
    update_q_tables(state, action, reward, next_state)
    state = next_state

final_Q = (Q[0] + Q[1])/2
# OR
final_Q = Q[0] + Q[1]

# agent's policy
policy = {state: np.argmax(final_Q[state]) for state in range(num_states)}
  • Balancing between exploration and exploitation
  • Training with random actions
    • Agent explores environment
    • No strategy optimization based on learned knowledge
    • Agent uses knowledge when training done
  • Exploration-exploitation trade-off
    • Balances exploration and exploitation
    • Continuous exploration prevents strategy refinement
    • Exclusive exploitation misses undiscovered opportunities
  • Epsilon-greedy strategy
    • Explore with probability epsilon
    • Exploit with probability 1 - epsilon
    • Ensures continuous exploration while using knowledge
  • Decayed epsilon-greedy strategy
    • Reduces epsilon over time
    • More exploration initially, more exploitation later on
    • Agent increasingly relies on its accumulated knowledge
env = gym.make('FrozenLake', is_slippery=True)

action_size = env.action_space.n
state_size = env.observation_space.n

Q = np.zeros((state_size, action_size))
alpha = 0.1
gamma = 0.99
total_episodes = 10000

def epsilon_greedy(state):
  if np.random.rand() < epsilon:
    action = env.action_space.sample() # Explore
  else:
    action = np.argmax(Q[state, :]) # Exploit
    
  return action

# training
epsilon = 0.9 # Exploration rate
rewards_eps_greedy = []

for episode in range(total_episodes):
  state, info = env.reset()
  terminated = False
  episode_reward = 0

  while not terminated:
    action = epsilon_greedy(state)
    new_state, reward, terminated, truncated, info = env.step(action)
    Q[state, action] = update_q_table(state, action, new_state)
    state = new_state
    episode_reward += reward
    
  rewards_eps_greedy.append(episode_reward)
  
# training decayed epsilon-greedy
epsilon = 1.0 # Exploration rate
epsilon_decay = 0.999
min_epsilon = 0.01
rewards_decay_eps_greedy = []

for episode in range(total_episodes):
  state, info = env.reset()
  terminated = False
  episode_reward = 0

  while not terminated:
    action = epsilon_greedy(state)
    new_state, reward, terminated, truncated, info = env.step(action)
    episode_reward += reward
    Q[state, action] = update_q_table(state, action, new_state)
    state = new_state

  rewards_decay_eps_greedy.append(episode_reward)
  epsilon = max(min_epsilon, epsilon * epsilon_decay)
# comparing strategies
avg_eps_greedy= np.mean(rewards_eps_greedy)
avg_decay = np.mean(rewards_decay_eps_greedy)
plt.bar(['Epsilon Greedy', 'Decayed Epsilon Greedy'], [avg_eps_greedy, avg_decay], color=['blue', 'green'])

plt.title('Average Reward per Episode')
plt.ylabel('Average Reward')
plt.show()
  • Multi-armed bandits
    • Gambler facing slot machines
    • Challenge - maximize winning
    • Solution - exploration-exploitation
  • Slot machines
    • Reward from an arm is 0 or 1
    • Agent's goal is to find the slot machine with maximum reward
  • Solving the problem
    • Decayed epsilon-greedy
    • Epsilon - select random machine
    • 1 - epsilon - select best machine so far
    • Epsilon decreases over time
# initialization
n_bandits = 4
true_bandit_probs = np.random.rand(n_bandits)

n_iterations = 100000
epsilon = 1.0
min_epsilon = 0.01
epsilon_decay = 0.999

counts = np.zeros(n_bandits) # How many times each bandit was played
values = np.zeros(n_bandits) # Estimated winning probability of each bandit
rewards = np.zeros(n_iterations) # Reward history
selected_arms = np.zeros(n_iterations, dtype=int) # Arm selection history

# iteration loop
for i in range(n_iterations):
  arm = epsilon_greedy()
  reward = np.random.rand() < true_bandit_probs[arm]
  rewards[i] = reward
  selected_arms[i] = arm
  counts[arm] += 1
  values[arm] += (reward - values[arm]) / counts[arm]
  epsilon = max(min_epsilon, epsilon * epsilon_decay)
  
# analyzing selections
selections_percentage = np.zeros((n_iterations, n_bandits))
for i in range(n_iterations):
  selections_percentage[i, selected_arms[i]] = 1
selections_percentage = \
  np.cumsum(selections_percentage, axis=0) /
  np.arange(1, n_iterations + 1).reshape(-1, 1)

# plotting the results
for arm in range(n_bandits):
  plt.plot(selections_percentage[:, arm], label=f'Bandit #{arm+1}')

plt.xscale('log')
plt.title('Bandit Action Choices Over Time')
plt.xlabel('Episode Number')
plt.ylabel('Percentage of Bandit Selections (%)')
plt.legend()
plt.show()

for i, prob in enumerate(true_bandit_probs, 1):
  print(f"Bandit #{i} -> {prob:.2f}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment