Skip to content

Instantly share code, notes, and snippets.

@apoorvnandan
Last active December 14, 2024 18:31
Show Gist options
  • Select an option

  • Save apoorvnandan/24cc361a8d882726dc28aa838a18086a to your computer and use it in GitHub Desktop.

Select an option

Save apoorvnandan/24cc361a8d882726dc28aa838a18086a to your computer and use it in GitHub Desktop.
import numpy as np
import math
np.random.seed(423)
# code for the environment
class CartPoleEnv:
def __init__(self):
# Environment parameters
self.gravity = 9.8
self.masscart = 1.0
self.masspole = 0.1
self.total_mass = self.masscart + self.masspole
self.length = 0.5 # Half the pole's length
self.polemass_length = self.masspole * self.length
self.force_mag = 10.0
self.tau = 0.02 # Time step (seconds)
self.kinematics_integrator = "euler"
# Termination thresholds
self.theta_threshold_radians = 12 * 2 * math.pi / 360 # ±12° in radians
self.x_threshold = 2.4 # Cart position threshold (±2.4 meters)
# Observation space bounds
high = np.array(
[
self.x_threshold * 2, # Cart position
np.finfo(np.float32).max, # Cart velocity
self.theta_threshold_radians * 2, # Pole angle
np.finfo(np.float32).max, # Pole angular velocity
],
dtype=np.float32,
)
self.observation_space = np.array([-high, high])
# Action space
self.action_space = 2 # Discrete actions: 0 (left) or 1 (right)
# Initialize state
self.state = None
self.steps_beyond_terminated = None
def reset(self):
# Initialize state with random values in the range (-0.05, 0.05)
self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
self.steps_beyond_terminated = None
return np.array(self.state, dtype=np.float32)
def step(self, action):
# Ensure the action is valid
assert action in [0, 1], f"Invalid action: {action}"
# Unpack the state
x, x_dot, theta, theta_dot = self.state
# Apply force
force = self.force_mag if action == 1 else -self.force_mag
# Compute dynamics
costheta = math.cos(theta)
sintheta = math.sin(theta)
# Temporary variable for pole dynamics
temp = (
force + self.polemass_length * theta_dot**2 * sintheta
) / self.total_mass
# Compute angular acceleration
thetaacc = (self.gravity * sintheta - costheta * temp) / (
self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
)
# Compute cart acceleration
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
# Update state using Euler integration
if self.kinematics_integrator == "euler":
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # Semi-implicit Euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
# Update the state
self.state = np.array([x, x_dot, theta, theta_dot])
# Check termination conditions
terminated = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
)
# Assign reward
if not terminated:
reward = 1.0
elif self.steps_beyond_terminated is None:
# Pole just fell!
self.steps_beyond_terminated = 0
reward = 1.0
else:
if self.steps_beyond_terminated == 0:
print(
"Warning: You are calling 'step()' even though this "
"environment has already returned terminated = True. You "
"should always call 'reset()' once you receive 'terminated = "
"True' -- any further steps are undefined behavior."
)
self.steps_beyond_terminated += 1
reward = 0.0
return np.array(self.state, dtype=np.float32), reward, terminated, {}
def init_adam(params, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8):
adam_dict = {
'lr': lr,
'beta1': beta1,
'beta2': beta2,
'eps': eps,
'state': {},
'state_step': 0
}
for key, val in params.items():
adam_dict['state'][key] = {
'm': np.zeros_like(val), # First moment vector
'v': np.zeros_like(val) # Second moment vector
}
return adam_dict
def adam_step(grads, params, adam_dict):
adam_dict['state_step'] += 1
state_step = adam_dict['state_step']
lr = adam_dict['lr']
beta1 = adam_dict['beta1']
beta2 = adam_dict['beta2']
eps = adam_dict['eps']
state = adam_dict['state']
for key in params:
grad = grads[key]
param = params[key]
m = state[key]['m']
v = state[key]['v']
m = beta1 * m + (1 - beta1) * grad
v = beta2 * v + (1 - beta2) * (grad ** 2)
m_hat = m / (1 - beta1 ** state_step)
v_hat = v / (1 - beta2 ** state_step)
param -= lr * m_hat / (np.sqrt(v_hat) + eps)
state[key]['m'] = m
state[key]['v'] = v
num_inputs = 4
num_actions = 2
num_hidden = 256
T_horizon = 20
learning_rate = 0.0005
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
def ffn(inp, params, grad=False):
w1_out = np.dot(inp, params['w1']) + params['b1']
relu_out = np.maximum(w1_out, 0)
logits = np.dot(relu_out, params['w2']) + params['b2']
if grad:
return logits, {'inp': inp, 'w1_out': w1_out, 'relu_out': relu_out}
return logits
def ffn_backward(out_grad, hidden_states, params):
relu_out_grad = np.dot(out_grad, params['w2'].T)
b2_grad = np.sum(out_grad, axis=0, keepdims=False)
w2_grad = np.dot(hidden_states['relu_out'].T, out_grad)
w1_out_grad = relu_out_grad * (hidden_states['w1_out'] > 0)
w1_grad = np.dot(hidden_states['inp'].T, w1_out_grad)
b1_grad = np.sum(w1_out_grad, axis=0, keepdims=False)
return {'w1': w1_grad, 'w2': w2_grad, 'b1': b1_grad, 'b2': b2_grad}
def softmax(x):
exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
return exp_x / np.sum(exp_x, axis=-1, keepdims=True)
def init_model():
actor_params = {
'w1': np.random.randn(num_inputs, num_hidden) * np.sqrt(2 / num_inputs),
'b1': np.zeros(num_hidden),
'w2': np.random.randn(num_hidden, num_actions) * np.sqrt(2 / num_hidden),
'b2': np.zeros(num_actions),
}
critic_params = {
'w1': np.random.randn(num_inputs, num_hidden) * np.sqrt(2 / num_inputs),
'b1': np.zeros(num_hidden),
'w2': np.random.randn(num_hidden, 1) * np.sqrt(2 / num_hidden),
'b2': np.zeros(1),
}
return actor_params, critic_params
def make_batch(data):
obs, acts, rews, newobs, probs, dones = zip(*[
(s, [a], [r], s_prime, [prob_a], [0 if done else 1])
for s, a, r, s_prime, prob_a, done in data
])
obs, acts, rews, newobs, probs, dones = map(
np.array,
(obs, acts, rews, newobs, probs, dones)
)
return obs, acts, rews, newobs, dones, probs
def train(actor_params, critic_params, data, opta, optc):
obs, acts, rews, newobs, dones, probs = make_batch(data)
for i in range(K_epoch):
targets = rews + gamma * ffn(newobs, critic_params, grad=False) * dones
delta = targets - ffn(obs, critic_params, grad=False)
advantage_lst = []
advantage = 0.0
for delta_t in delta[::-1]:
advantage = gamma * lmbda * advantage + delta_t[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = np.array(advantage_lst, dtype=np.float32)
logits, hs_actor = ffn(obs, actor_params, grad=True)
actor_probs = softmax(logits)
logprobs = np.log(actor_probs)
logpi_a = logprobs[np.arange(actor_probs.shape[0]), acts.flatten()]
ratio = np.exp(logpi_a - np.log(probs))
surr1 = ratio * advantage
surr2 = np.clip(ratio, 1-eps_clip, 1+eps_clip) * advantage
critic_vals, hs_critic = ffn(obs, critic_params, grad=True)
actor_loss = -np.minimum(surr1, surr2)
critic_loss = 0.5 * (critic_vals - targets) ** 2
# backward
surr1_grad = np.zeros_like(surr1)
surr2_grad = np.zeros_like(surr2)
surr1_grad += np.where(surr1 <= surr2, -1, 0)
surr2_grad += np.where(surr1 >= surr2, -1, 0)
ratio_grad = np.zeros_like(ratio)
ratio_grad += surr1_grad * advantage
clip_grad = surr2_grad * advantage
ratio_grad += np.where(np.clip(ratio, 1-eps_clip, 1+eps_clip) == ratio, clip_grad, 0)
diff_grad = ratio * ratio_grad
logpi_a_grad = diff_grad
logprobs_grad = np.zeros_like(logprobs)
for i in range(logprobs_grad.shape[0]):
logprobs_grad[i,acts[i,0]] += logpi_a_grad[i,0]
softmax_output = actor_probs
logits_grad = logprobs_grad - softmax_output * np.sum(logprobs_grad, axis=1, keepdims=True)
actor_grads = ffn_backward(logits_grad, hs_actor, actor_params)
adam_step(actor_grads, actor_params, opta)
critic_vals_grad = np.zeros_like(critic_vals)
critic_vals_grad += (critic_vals - targets)
critic_grads = ffn_backward(critic_vals_grad, hs_critic, critic_params)
adam_step(critic_grads, critic_params, optc)
def main():
env = CartPoleEnv()
actor_params, critic_params = init_model()
opta = init_adam(actor_params, lr=learning_rate)
optc = init_adam(critic_params, lr=learning_rate)
data = []
score = 0.0
print_interval = 20
running_reward = 0
for n_epi in range(5000):
s = env.reset()
done = False
episode_reward = 0
while not done:
for t in range(T_horizon):
obs = s.reshape(1, -1)
logits = ffn(obs, actor_params, grad=False)
prob = softmax(logits)[0]
a = np.random.choice(num_actions, p=prob)
s_prime, r, done, info = env.step(a)
data.append((s, a, r/100.0, s_prime, prob[a], done))
s = s_prime
score += r
episode_reward += r
if done:
break
train(actor_params, critic_params, data, opta, optc)
data = []
running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward
if n_epi % 100 == 0:
template = "running reward: {:.2f} at episode {}"
print(template.format(running_reward, n_epi))
if running_reward > 195: # Condition to consider the task solved
print("solved at episode {}!".format(n_epi))
break
main()
@kir486680
Copy link
Copy Markdown

kir486680 commented Dec 14, 2024

Just a small correction: the actor() function is not defined, which causes a NameError. Replacing actor() with ffn() and pass the appropriate parameter dictionaries (e.g., actor_params or critic_params) helps. solved at episode 1442.

@apoorvnandan
Copy link
Copy Markdown
Author

@kir486680 thanks, fixed it now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment