Skip to content

Instantly share code, notes, and snippets.

@lucifermorningstar1305
Last active December 24, 2023 07:04
Show Gist options
  • Save lucifermorningstar1305/e36183edde5666a74c98916a051af0d4 to your computer and use it in GitHub Desktop.
Save lucifermorningstar1305/e36183edde5666a74c98916a051af0d4 to your computer and use it in GitHub Desktop.
Policy Gradient method for Mountain Car Continuous with Evolution Algorithm
"""
This is an implementation of the Hill Climbing algorithm for the MountainCar-v1 (Continuous Spaces) environment.
No gradient ascent is performed in this script.
"""
from typing import Callable, Dict, List, Any, Tuple, Optional
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym
import os
import argparse
from gymnasium.wrappers.record_video import RecordVideo
from utils import watch_agent, plot_running_avg, FeatureTransformers
# torch.manual_seed(32)
GAMMA = 0.99
class NeuralModel(nn.Module):
def __init__(
self,
D: int,
hidden_layer_sizes: List,
activation: str = "tanh",
last_activation: Optional[Callable] = None,
last_zero: bool = False,
):
super().__init__()
activations = dict(tanh=nn.Tanh, relu=nn.ReLU, lrelu=nn.LeakyReLU)
M1 = D
self.model = nn.Sequential()
for idx, M2 in enumerate(hidden_layer_sizes):
self.model.add_module(
f"layer_{idx+1}", nn.Linear(in_features=M1, out_features=M2)
)
self.model.add_module(f"activation_{idx+1}", activations[activation]())
M1 = M2
self.model.add_module(
"last_layer", nn.Linear(in_features=M1, out_features=1, bias=False)
)
if last_activation is not None:
self.model.add_module("last_activation", last_activation())
self.last_zero = last_zero
self.initialize_weights()
def forward(self, X: torch.Tensor) -> torch.Tensor:
return self.model(X)
def initialize_weights(self):
for name, m in self.named_modules():
if "last_layer" in name and isinstance(m, nn.Linear) and self.last_zero:
nn.init.constant_(m.weight, 0.0)
class PolicyModel:
def __init__(
self,
ft: Callable,
D: int,
mean_hidden_layer_sizes: List,
var_hidden_layer_sizes: List,
smoothing_val: float = 1e-5,
device: str = "cuda",
):
self.ft = ft
self.D = D
self.mean_hidden_layer_sizes = mean_hidden_layer_sizes
self.var_hidden_layer_sizes = var_hidden_layer_sizes
self.smooth_val = smoothing_val
self.device = device
self.mean_model = NeuralModel(
D=D, hidden_layer_sizes=mean_hidden_layer_sizes, last_zero=True
).to(device=device)
self.var_model = NeuralModel(
D=D, hidden_layer_sizes=var_hidden_layer_sizes, last_activation=nn.Softplus
).to(device=device)
self.params = list()
for param in list(self.mean_model.parameters()) + list(
self.var_model.parameters()
):
self.params += param
def predict(self, X: np.ndarray) -> torch.Tensor:
# X = X.reshape(1, -1)
X = np.atleast_2d(X)
X = self.ft.transform(X)
X = torch.tensor(X).float().to(device=self.device)
mu = self.mean_model(X).reshape(-1)
std = self.var_model(X).reshape(-1) + self.smooth_val
# norm = torch.distributions.normal.Normal(loc=mu, scale=std)
norm = torch.distributions.Normal(loc=mu, scale=std)
return torch.clip(norm.sample(), -1, 1)
def sample_action(self, X: np.ndarray) -> torch.Tensor:
a_val = self.predict(X)[0]
return a_val
def copy(self):
clone = PolicyModel(
self.ft, self.D, self.mean_hidden_layer_sizes, self.var_hidden_layer_sizes
)
clone.copy_from(self)
return clone
def copy_from(self, other):
cur_param = self.params
other_param = other.params
for p, q in zip(cur_param, other_param):
p.data.copy_(q.data)
def perturb_params(self):
"""Function used for Hill Climbing. Adds noise to the params and generates a new random setting"""
with torch.no_grad():
for p in list(self.mean_model.parameters()) + list(
self.var_model.parameters()
):
if len(p.data.size()):
noise = (
np.random.randn(*tuple(p.data.size()))
/ np.sqrt(p.data.size(0))
* 5.0
)
if np.random.random() < 0.1:
p.copy_(
torch.nn.Parameter(
torch.tensor(noise).float().to(device=self.device)
)
)
else:
p.copy_(
torch.nn.Parameter(
p + torch.tensor(noise).float().to(device=self.device)
)
)
def play_one_episode(env: gym.Env, policy_model: object):
"""Function to play one episode of the environment and return the reward"""
s, info = env.reset()
done, truncated = False, False
total_reward = 0
while not (done or truncated):
a = policy_model.sample_action(s).detach().cpu().numpy()
s_next, r, done, truncated, info = env.step([a])
s = s_next
total_reward += r
return total_reward
def play_multiple_episodes(
env: gym.Env, policy_model: object, n_episodes: int = 100, print_iters: bool = False
) -> float:
"""Function to play multiple epsiodes of the environment."""
rewards = np.empty(n_episodes)
for i in range(n_episodes):
r = play_one_episode(env=env, policy_model=policy_model)
rewards[i] = r
if print_iters:
print(f"Episode: {i} | Average Reward: {rewards[:(i+1)].mean():.5f}")
avg_reward = rewards.mean()
print(f"Average Reward: {avg_reward:.5f}")
return avg_reward
def random_search(env: gym.Env, policy_model: object) -> Tuple:
"""Function to randomly search the environment and find the best possible model for the given environment"""
total_rewards = list()
best_avg_reward = float("-inf")
best_policy_model = policy_model
num_episodes_per_param_test = 3
for t in range(100):
tmp_model = best_policy_model.copy()
tmp_model.perturb_params()
avg_reward = play_multiple_episodes(
env=env, policy_model=tmp_model, n_episodes=num_episodes_per_param_test
)
total_rewards.append(avg_reward)
if avg_reward > best_avg_reward:
best_avg_reward = avg_reward
best_policy_model = tmp_model
return total_rewards, best_policy_model
if __name__ == "__main__":
env = gym.make("MountainCarContinuous-v0", max_episode_steps=2000)
samples = np.array([env.observation_space.sample() for _ in range(10_000)])
ft = FeatureTransformers(n_components=100)
ft.fit(samples)
D = ft.dimension
policy_model = PolicyModel(
ft=ft,
D=D,
mean_hidden_layer_sizes=[],
var_hidden_layer_sizes=[],
smoothing_val=1e-4,
)
total_rewards, policy_model = random_search(env=env, policy_model=policy_model)
print(f"Max reward: {np.max(total_rewards)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment