Last active
July 31, 2020 10:50
-
-
Save ChuaCheowHuan/13e2cf69e01c4cc3fa2e803a6afdda98 to your computer and use it in GitHub Desktop.
Change hyperparameters during runtime for MARL with ray[rllib]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""hyp_chg_MARL.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
""" | |
# Commented out IPython magic to ensure Python compatibility. | |
""" | |
from google.colab import drive | |
drive.mount('/content/gdrive') | |
# %cd "/content/gdrive/My Drive/Colab Notebooks/misc_code_examples/ray_colab_examples/rock_paper_scissors_multiagent/" | |
!mkdir chkpt | |
!pwd | |
!ls -l | |
!pip install tensorflow==2.2.0 | |
!pip install ray[rllib]==0.8.6 | |
""" | |
""" | |
Testing the changing of hyperparameters during runtime. | |
Learning rate of both agents are initialized to 0. | |
At the 50th iteration, the learning rate of agt_0 is set to 0.01 while the learning rate of agt_1 remains at 0. | |
This is the only time the learning rate is changed. | |
The results will be agt_0 consistently winning after the 50th iteration as it gradually learns. | |
""" | |
from collections import defaultdict | |
from typing import Dict | |
from gym.spaces import Discrete | |
import numpy as np | |
import argparse | |
import random | |
import ray | |
from ray.tune.registry import register_env | |
from ray.rllib.models import ModelCatalog | |
from ray.rllib.policy.policy import Policy | |
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy | |
from ray.rllib.agents.ppo import ppo | |
from ray.rllib.agents.ppo.ppo import PPOTrainer | |
from ray.rllib.env import BaseEnv | |
from ray.rllib.env.multi_agent_env import MultiAgentEnv | |
from ray.rllib.utils import try_import_tf | |
from ray.tune.logger import pretty_print | |
from ray.rllib.policy.sample_batch import SampleBatch | |
from ray.rllib.evaluation import MultiAgentEpisode, RolloutWorker | |
from ray.rllib.agents.callbacks import DefaultCallbacks | |
tf = try_import_tf() | |
ROCK = 0 | |
PAPER = 1 | |
SCISSORS = 2 | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--stop", type=int, default=400000) | |
class RockPaperScissorsEnv(MultiAgentEnv): | |
"""Two-player environment for rock paper scissors. | |
The observation is simply the last opponent action.""" | |
def __init__(self, _): | |
self.action_space = Discrete(3) | |
self.observation_space = Discrete(3) | |
self.player1 = "player1" | |
self.player2 = "player2" | |
self.last_move = None | |
self.num_moves = 0 | |
self.P1_eps_r = 0 | |
self.P2_eps_r = 0 | |
def reset(self): | |
#print("P1_eps_r =", self.P1_eps_r) | |
#print("P2_eps_r =", self.P2_eps_r) | |
self.P1_eps_r = 0 | |
self.P2_eps_r = 0 | |
self.last_move = (0, 0) | |
self.num_moves = 0 | |
return { | |
self.player1: self.last_move[1], | |
self.player2: self.last_move[0], | |
} | |
def step(self, action_dict): | |
move1 = action_dict[self.player1] | |
move2 = action_dict[self.player2] | |
self.last_move = (move1, move2) | |
obs = { | |
self.player1: self.last_move[1], | |
self.player2: self.last_move[0], | |
} | |
r1, r2 = { | |
(ROCK, ROCK): (0.0, 0.0), | |
(ROCK, PAPER): (-1.0, 1.0), | |
(ROCK, SCISSORS): (1.0, -1.0), | |
(PAPER, ROCK): (1.0, -1.0), | |
(PAPER, PAPER): (0.0, 0.0), | |
(PAPER, SCISSORS): (-1.0, 1.0), | |
(SCISSORS, ROCK): (-1.0, 1.0), | |
(SCISSORS, PAPER): (1.0, -1.0), | |
(SCISSORS, SCISSORS): (0.0, 0.0), | |
}[move1, move2] | |
rew = { | |
self.player1: r1, | |
self.player2: r2, | |
} | |
self.num_moves += 1 | |
done = { | |
"__all__": self.num_moves >= 10, | |
} | |
#print("rew =", rew) | |
self.P1_eps_r += rew[self.player1] | |
self.P2_eps_r += rew[self.player2] | |
return obs, rew, done, {} | |
@ray.remote(num_cpus=0.25, num_gpus=0) | |
class Helper: | |
def __init__(self, iter_chg, lr, gamma): | |
self.lr = lr | |
self.gamma = gamma | |
self.P1_policy_reward_mean = 0 | |
self.P2_policy_reward_mean = 0 | |
self.iter = 0 | |
self.is_hpy_chg = False | |
self.iter_chg = iter_chg | |
def set_hyperparameters(self, lr, gamma): | |
if self.iter == self.iter_chg: | |
self.lr = lr | |
self.gamma = gamma | |
self.is_hpy_chg = True | |
print("set_hyperparameters") | |
def get_hyperparameters(self): | |
return self.lr, self.gamma | |
def set_policy_reward_mean(self, P1, P2): | |
self.P1_policy_reward_mean += P1 | |
self.P2_policy_reward_mean += P2 | |
def get_policy_reward_mean(self): | |
return self.P1_policy_reward_mean, self.P2_policy_reward_mean | |
def add_iter(self): | |
self.iter += 1 | |
def set_is_hpy_chg(self, status): | |
self.is_hpy_chg = status | |
def get_is_hpy_chg(self): | |
return self.is_hpy_chg | |
"""#Callbacks""" | |
class MyCallbacks(DefaultCallbacks): | |
def on_episode_start(self, worker: RolloutWorker, base_env: BaseEnv, | |
policies: Dict[str, Policy], | |
episode: MultiAgentEpisode, **kwargs): | |
#print("on_episode_start {}, _agent_to_policy {}".format(episode.episode_id, episode._agent_to_policy)) | |
#episode.hist_data["episode_id"] = [] | |
pass | |
def on_episode_step(self, worker: RolloutWorker, base_env: BaseEnv, | |
episode: MultiAgentEpisode, **kwargs): | |
""" | |
pole_angle = abs(episode.last_observation_for()[2]) | |
raw_angle = abs(episode.last_raw_obs_for()[2]) | |
assert pole_angle == raw_angle | |
episode.user_data["pole_angles"].append(pole_angle) | |
""" | |
pass | |
def on_episode_end(self, worker: RolloutWorker, base_env: BaseEnv, | |
policies: Dict[str, Policy], episode: MultiAgentEpisode, | |
**kwargs): | |
#print("on_episode_end {}, episode.agent_rewards {}".format(episode.episode_id, episode.agent_rewards)) | |
pass | |
def on_sample_end(self, worker: RolloutWorker, samples: SampleBatch, | |
**kwargs): | |
#print("on_sample_end returned sample batch of size {}".format(samples.count)) | |
pass | |
def on_train_result(self, trainer, result: dict, **kwargs): | |
#print("trainer.train() result: {} -> {} episodes".format(trainer, result["episodes_this_iter"])) | |
# you can mutate the result dict to add new fields to return | |
#result["callback_ok"] = True | |
#print("on_train_result result", result) | |
# hard coded "new" hyperparamters: | |
lr = 0.01 | |
gamma = 0.9 | |
g_helper = ray.get_actor("g_helper") | |
ray.get(g_helper.set_hyperparameters.remote(lr, gamma)) | |
#lr, gamma = ray.get(g_helper.get_hyperparameters.remote()) | |
#print("lr= {}, gamma ={}".format(lr, gamma)) | |
ray.get(g_helper.set_policy_reward_mean.remote(result["policy_reward_mean"]["agt_0"], result["policy_reward_mean"]["agt_1"])) | |
ray.get(g_helper.add_iter.remote()) | |
#pass | |
def on_postprocess_trajectory( | |
self, worker: RolloutWorker, episode: MultiAgentEpisode, | |
agent_id: str, policy_id: str, policies: Dict[str, Policy], | |
postprocessed_batch: SampleBatch, | |
original_batches: Dict[str, SampleBatch], **kwargs): | |
#print("postprocessed {}, {}, {}, {} steps".format(episode, agent_id, policy_id, postprocessed_batch.count)) | |
""" | |
if "num_batches" not in episode.custom_metrics: | |
episode.custom_metrics["num_batches"] = 0 | |
episode.custom_metrics["num_batches"] += 1 | |
""" | |
pass | |
train_policy_list = ["agt_0", "agt_1"] | |
use_lstm=True #False | |
lr = 0.0 #1e-30 | |
gamma = 0.9 | |
policies = {"agt_0": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm}, | |
"lr": lr, | |
"gamma": gamma}), | |
"agt_1": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm}, | |
"lr": lr, | |
"gamma": gamma}) | |
} | |
def select_policy(agent_id): | |
if agent_id == "player1": | |
return "agt_0" | |
else: | |
return "agt_1" | |
def my_config(): | |
config = ppo.DEFAULT_CONFIG.copy() | |
config["multiagent"] = {"policies_to_train": train_policy_list, | |
"policies": policies, | |
"policy_mapping_fn": select_policy, | |
} | |
config["num_cpus_per_worker"] = 0.25 | |
#config["num_gpus_per_worker"] = 0.25 | |
config["num_workers"] = 2 | |
config["num_envs_per_worker"] = 2 | |
config["batch_mode"] = "truncate_episodes" # "complete_episodes" or "truncate_episodes" | |
config["rollout_fragment_length"] = 10 # let's sample 10 steps per episode which is the same as batch_mode="complete_episodes" | |
config["train_batch_size"] = 10 # Training batch size, if applicable. Should be >= rollout_fragment_length. | |
# Samples batches will be concatenated together to a batch of this size, | |
# which is then passed to SGD. | |
# If batch_mode is "complete_episodes", | |
config["sgd_minibatch_size"] = 10 # default=128, sgd_minibatch_size, must be <= train_batch_size. | |
config["num_sgd_iter"] = 3 # default=30, number of epochs to execute per train batch. | |
config["log_level"] = "WARN" # WARN/INFO/DEBUG | |
config["callbacks"] = MyCallbacks | |
return config | |
def test_hyperparam_chg(trainer): | |
""" | |
new_policies = {"agt_0": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm}, | |
"lr": 0.00001, | |
"gamma": 0.9}), | |
"agt_1": (PPOTFPolicy, Discrete(3), Discrete(3), {"model": {"use_lstm": use_lstm}, | |
"lr": 0.1, | |
"gamma": 0.9}) | |
} | |
""" | |
g_helper = ray.get_actor("g_helper") | |
lr, gamma = ray.get(g_helper.get_hyperparameters.remote()) | |
#print("lr= {}, gamma ={}".format(lr, gamma)) | |
is_hpy_chg = ray.get(g_helper.get_is_hpy_chg.remote()) | |
if is_hpy_chg == True: | |
curr_config = trainer.get_config() | |
print("curr_config", curr_config) | |
print("config['multiagent']['policies']", curr_config["multiagent"]["policies"]) | |
new_config = curr_config | |
#new_config["multiagent"]["policies"] = new_policies | |
tar_ind = 3 | |
new_config["multiagent"]["policies"]["agt_0"][tar_ind]["lr"] = lr | |
new_config["multiagent"]["policies"]["agt_0"][tar_ind]["gamma"] = gamma | |
player1_pol = trainer.get_policy("agt_0") | |
print("Before") | |
print("player1_pol.config['lr']", player1_pol.config["lr"]) | |
print("player1_pol.config['gamma']", player1_pol.config["gamma"]) | |
local_dir = "/content/gdrive/My Drive/Colab Notebooks/misc_code_examples/ray_colab_examples/rock_paper_scissors_multiagent/chkpt/" | |
save_path = trainer.save(local_dir) | |
trainer.stop() | |
trainer = ppo.PPOTrainer(config=new_config, env="RockPaperScissorsEnv") | |
trainer.restore(save_path) | |
player1_pol = trainer.get_policy("agt_0") | |
print("After") | |
print("player1_pol.config['lr']", player1_pol.config["lr"]) | |
print("player1_pol.config['gamma']", player1_pol.config["gamma"]) | |
ray.get(g_helper.set_is_hpy_chg.remote(False)) | |
return trainer | |
def go_train(): | |
g_helper = ray.get_actor("g_helper") | |
trainer = ppo.PPOTrainer(config=my_config(), env="RockPaperScissorsEnv") | |
for i in range(300): | |
# Perform one iteration of training the policy with PPO | |
result = trainer.train() | |
if i % 30 == 0: | |
print(pretty_print(result)) | |
P1_policy_reward_mean, P2_policy_reward_mean = ray.get(g_helper.get_policy_reward_mean.remote()) | |
print("TOTAL: P1_policy_reward_mean ={}, P2_policy_reward_mean ={}".format(P1_policy_reward_mean, P2_policy_reward_mean)) | |
trainer = test_hyperparam_chg(trainer) | |
print(pretty_print(result)) | |
P1_policy_reward_mean, P2_policy_reward_mean = ray.get(g_helper.get_policy_reward_mean.remote()) | |
print("TOTAL: P1_policy_reward_mean ={}, P2_policy_reward_mean ={}".format(P1_policy_reward_mean, P2_policy_reward_mean)) | |
register_env("RockPaperScissorsEnv", lambda _: RockPaperScissorsEnv(_)) | |
ray.shutdown() | |
ray.init(ignore_reinit_error=True, log_to_driver=True, webui_host='127.0.0.1', num_cpus=2, num_gpus=0) #start ray | |
iter_chg = 49 | |
g_helper = Helper.options(name="g_helper").remote(iter_chg, lr, gamma) # this object runs on a different ray actor process | |
ray.get(g_helper.set_hyperparameters.remote(lr, gamma)) | |
#lr, gamma = ray.get(g_helper.get_hyperparameters.remote()) | |
#print("lr= {}, gamma ={}".format(lr, gamma)) | |
go_train() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment