Last active
December 14, 2024 18:31
-
-
Save apoorvnandan/24cc361a8d882726dc28aa838a18086a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import math | |
| np.random.seed(423) | |
| # code for the environment | |
| class CartPoleEnv: | |
| def __init__(self): | |
| # Environment parameters | |
| self.gravity = 9.8 | |
| self.masscart = 1.0 | |
| self.masspole = 0.1 | |
| self.total_mass = self.masscart + self.masspole | |
| self.length = 0.5 # Half the pole's length | |
| self.polemass_length = self.masspole * self.length | |
| self.force_mag = 10.0 | |
| self.tau = 0.02 # Time step (seconds) | |
| self.kinematics_integrator = "euler" | |
| # Termination thresholds | |
| self.theta_threshold_radians = 12 * 2 * math.pi / 360 # ±12° in radians | |
| self.x_threshold = 2.4 # Cart position threshold (±2.4 meters) | |
| # Observation space bounds | |
| high = np.array( | |
| [ | |
| self.x_threshold * 2, # Cart position | |
| np.finfo(np.float32).max, # Cart velocity | |
| self.theta_threshold_radians * 2, # Pole angle | |
| np.finfo(np.float32).max, # Pole angular velocity | |
| ], | |
| dtype=np.float32, | |
| ) | |
| self.observation_space = np.array([-high, high]) | |
| # Action space | |
| self.action_space = 2 # Discrete actions: 0 (left) or 1 (right) | |
| # Initialize state | |
| self.state = None | |
| self.steps_beyond_terminated = None | |
| def reset(self): | |
| # Initialize state with random values in the range (-0.05, 0.05) | |
| self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,)) | |
| self.steps_beyond_terminated = None | |
| return np.array(self.state, dtype=np.float32) | |
| def step(self, action): | |
| # Ensure the action is valid | |
| assert action in [0, 1], f"Invalid action: {action}" | |
| # Unpack the state | |
| x, x_dot, theta, theta_dot = self.state | |
| # Apply force | |
| force = self.force_mag if action == 1 else -self.force_mag | |
| # Compute dynamics | |
| costheta = math.cos(theta) | |
| sintheta = math.sin(theta) | |
| # Temporary variable for pole dynamics | |
| temp = ( | |
| force + self.polemass_length * theta_dot**2 * sintheta | |
| ) / self.total_mass | |
| # Compute angular acceleration | |
| thetaacc = (self.gravity * sintheta - costheta * temp) / ( | |
| self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass) | |
| ) | |
| # Compute cart acceleration | |
| xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass | |
| # Update state using Euler integration | |
| if self.kinematics_integrator == "euler": | |
| x = x + self.tau * x_dot | |
| x_dot = x_dot + self.tau * xacc | |
| theta = theta + self.tau * theta_dot | |
| theta_dot = theta_dot + self.tau * thetaacc | |
| else: # Semi-implicit Euler | |
| x_dot = x_dot + self.tau * xacc | |
| x = x + self.tau * x_dot | |
| theta_dot = theta_dot + self.tau * thetaacc | |
| theta = theta + self.tau * theta_dot | |
| # Update the state | |
| self.state = np.array([x, x_dot, theta, theta_dot]) | |
| # Check termination conditions | |
| terminated = bool( | |
| x < -self.x_threshold | |
| or x > self.x_threshold | |
| or theta < -self.theta_threshold_radians | |
| or theta > self.theta_threshold_radians | |
| ) | |
| # Assign reward | |
| if not terminated: | |
| reward = 1.0 | |
| elif self.steps_beyond_terminated is None: | |
| # Pole just fell! | |
| self.steps_beyond_terminated = 0 | |
| reward = 1.0 | |
| else: | |
| if self.steps_beyond_terminated == 0: | |
| print( | |
| "Warning: You are calling 'step()' even though this " | |
| "environment has already returned terminated = True. You " | |
| "should always call 'reset()' once you receive 'terminated = " | |
| "True' -- any further steps are undefined behavior." | |
| ) | |
| self.steps_beyond_terminated += 1 | |
| reward = 0.0 | |
| return np.array(self.state, dtype=np.float32), reward, terminated, {} | |
| def init_adam(params, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8): | |
| adam_dict = { | |
| 'lr': lr, | |
| 'beta1': beta1, | |
| 'beta2': beta2, | |
| 'eps': eps, | |
| 'state': {}, | |
| 'state_step': 0 | |
| } | |
| for key, val in params.items(): | |
| adam_dict['state'][key] = { | |
| 'm': np.zeros_like(val), # First moment vector | |
| 'v': np.zeros_like(val) # Second moment vector | |
| } | |
| return adam_dict | |
| def adam_step(grads, params, adam_dict): | |
| adam_dict['state_step'] += 1 | |
| state_step = adam_dict['state_step'] | |
| lr = adam_dict['lr'] | |
| beta1 = adam_dict['beta1'] | |
| beta2 = adam_dict['beta2'] | |
| eps = adam_dict['eps'] | |
| state = adam_dict['state'] | |
| for key in params: | |
| grad = grads[key] | |
| param = params[key] | |
| m = state[key]['m'] | |
| v = state[key]['v'] | |
| m = beta1 * m + (1 - beta1) * grad | |
| v = beta2 * v + (1 - beta2) * (grad ** 2) | |
| m_hat = m / (1 - beta1 ** state_step) | |
| v_hat = v / (1 - beta2 ** state_step) | |
| param -= lr * m_hat / (np.sqrt(v_hat) + eps) | |
| state[key]['m'] = m | |
| state[key]['v'] = v | |
| num_inputs = 4 | |
| num_actions = 2 | |
| num_hidden = 256 | |
| T_horizon = 20 | |
| learning_rate = 0.0005 | |
| gamma = 0.98 | |
| lmbda = 0.95 | |
| eps_clip = 0.1 | |
| K_epoch = 3 | |
| def ffn(inp, params, grad=False): | |
| w1_out = np.dot(inp, params['w1']) + params['b1'] | |
| relu_out = np.maximum(w1_out, 0) | |
| logits = np.dot(relu_out, params['w2']) + params['b2'] | |
| if grad: | |
| return logits, {'inp': inp, 'w1_out': w1_out, 'relu_out': relu_out} | |
| return logits | |
| def ffn_backward(out_grad, hidden_states, params): | |
| relu_out_grad = np.dot(out_grad, params['w2'].T) | |
| b2_grad = np.sum(out_grad, axis=0, keepdims=False) | |
| w2_grad = np.dot(hidden_states['relu_out'].T, out_grad) | |
| w1_out_grad = relu_out_grad * (hidden_states['w1_out'] > 0) | |
| w1_grad = np.dot(hidden_states['inp'].T, w1_out_grad) | |
| b1_grad = np.sum(w1_out_grad, axis=0, keepdims=False) | |
| return {'w1': w1_grad, 'w2': w2_grad, 'b1': b1_grad, 'b2': b2_grad} | |
| def softmax(x): | |
| exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True)) | |
| return exp_x / np.sum(exp_x, axis=-1, keepdims=True) | |
| def init_model(): | |
| actor_params = { | |
| 'w1': np.random.randn(num_inputs, num_hidden) * np.sqrt(2 / num_inputs), | |
| 'b1': np.zeros(num_hidden), | |
| 'w2': np.random.randn(num_hidden, num_actions) * np.sqrt(2 / num_hidden), | |
| 'b2': np.zeros(num_actions), | |
| } | |
| critic_params = { | |
| 'w1': np.random.randn(num_inputs, num_hidden) * np.sqrt(2 / num_inputs), | |
| 'b1': np.zeros(num_hidden), | |
| 'w2': np.random.randn(num_hidden, 1) * np.sqrt(2 / num_hidden), | |
| 'b2': np.zeros(1), | |
| } | |
| return actor_params, critic_params | |
| def make_batch(data): | |
| obs, acts, rews, newobs, probs, dones = zip(*[ | |
| (s, [a], [r], s_prime, [prob_a], [0 if done else 1]) | |
| for s, a, r, s_prime, prob_a, done in data | |
| ]) | |
| obs, acts, rews, newobs, probs, dones = map( | |
| np.array, | |
| (obs, acts, rews, newobs, probs, dones) | |
| ) | |
| return obs, acts, rews, newobs, dones, probs | |
| def train(actor_params, critic_params, data, opta, optc): | |
| obs, acts, rews, newobs, dones, probs = make_batch(data) | |
| for i in range(K_epoch): | |
| targets = rews + gamma * ffn(newobs, critic_params, grad=False) * dones | |
| delta = targets - ffn(obs, critic_params, grad=False) | |
| advantage_lst = [] | |
| advantage = 0.0 | |
| for delta_t in delta[::-1]: | |
| advantage = gamma * lmbda * advantage + delta_t[0] | |
| advantage_lst.append([advantage]) | |
| advantage_lst.reverse() | |
| advantage = np.array(advantage_lst, dtype=np.float32) | |
| logits, hs_actor = ffn(obs, actor_params, grad=True) | |
| actor_probs = softmax(logits) | |
| logprobs = np.log(actor_probs) | |
| logpi_a = logprobs[np.arange(actor_probs.shape[0]), acts.flatten()] | |
| ratio = np.exp(logpi_a - np.log(probs)) | |
| surr1 = ratio * advantage | |
| surr2 = np.clip(ratio, 1-eps_clip, 1+eps_clip) * advantage | |
| critic_vals, hs_critic = ffn(obs, critic_params, grad=True) | |
| actor_loss = -np.minimum(surr1, surr2) | |
| critic_loss = 0.5 * (critic_vals - targets) ** 2 | |
| # backward | |
| surr1_grad = np.zeros_like(surr1) | |
| surr2_grad = np.zeros_like(surr2) | |
| surr1_grad += np.where(surr1 <= surr2, -1, 0) | |
| surr2_grad += np.where(surr1 >= surr2, -1, 0) | |
| ratio_grad = np.zeros_like(ratio) | |
| ratio_grad += surr1_grad * advantage | |
| clip_grad = surr2_grad * advantage | |
| ratio_grad += np.where(np.clip(ratio, 1-eps_clip, 1+eps_clip) == ratio, clip_grad, 0) | |
| diff_grad = ratio * ratio_grad | |
| logpi_a_grad = diff_grad | |
| logprobs_grad = np.zeros_like(logprobs) | |
| for i in range(logprobs_grad.shape[0]): | |
| logprobs_grad[i,acts[i,0]] += logpi_a_grad[i,0] | |
| softmax_output = actor_probs | |
| logits_grad = logprobs_grad - softmax_output * np.sum(logprobs_grad, axis=1, keepdims=True) | |
| actor_grads = ffn_backward(logits_grad, hs_actor, actor_params) | |
| adam_step(actor_grads, actor_params, opta) | |
| critic_vals_grad = np.zeros_like(critic_vals) | |
| critic_vals_grad += (critic_vals - targets) | |
| critic_grads = ffn_backward(critic_vals_grad, hs_critic, critic_params) | |
| adam_step(critic_grads, critic_params, optc) | |
| def main(): | |
| env = CartPoleEnv() | |
| actor_params, critic_params = init_model() | |
| opta = init_adam(actor_params, lr=learning_rate) | |
| optc = init_adam(critic_params, lr=learning_rate) | |
| data = [] | |
| score = 0.0 | |
| print_interval = 20 | |
| running_reward = 0 | |
| for n_epi in range(5000): | |
| s = env.reset() | |
| done = False | |
| episode_reward = 0 | |
| while not done: | |
| for t in range(T_horizon): | |
| obs = s.reshape(1, -1) | |
| logits = ffn(obs, actor_params, grad=False) | |
| prob = softmax(logits)[0] | |
| a = np.random.choice(num_actions, p=prob) | |
| s_prime, r, done, info = env.step(a) | |
| data.append((s, a, r/100.0, s_prime, prob[a], done)) | |
| s = s_prime | |
| score += r | |
| episode_reward += r | |
| if done: | |
| break | |
| train(actor_params, critic_params, data, opta, optc) | |
| data = [] | |
| running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward | |
| if n_epi % 100 == 0: | |
| template = "running reward: {:.2f} at episode {}" | |
| print(template.format(running_reward, n_epi)) | |
| if running_reward > 195: # Condition to consider the task solved | |
| print("solved at episode {}!".format(n_epi)) | |
| break | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just a small correction: the actor() function is not defined, which causes a NameError. Replacing actor() with ffn() and pass the appropriate parameter dictionaries (e.g., actor_params or critic_params) helps. solved at episode 1442.