Created
December 19, 2017 21:05
-
-
Save SelvamArul/d5f53a181d2a430583a5d1b3c78a09db to your computer and use it in GitHub Desktop.
DQN for gym breakout game.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import math | |
import random | |
import numpy as np | |
from collections import namedtuple | |
from itertools import count | |
from copy import deepcopy | |
from PIL import Image | |
import visdom | |
vis = visdom.Visdom( env = "breakout" ) | |
import torch as th | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
import torchvision.transforms as T | |
import time | |
# if gpu is to be used | |
use_cuda = th.cuda.is_available() | |
FloatTensor = th.cuda.FloatTensor if use_cuda else th.FloatTensor | |
LongTensor = th.cuda.LongTensor if use_cuda else th.LongTensor | |
ByteTensor = th.cuda.ByteTensor if use_cuda else th.ByteTensor | |
Tensor = FloatTensor | |
EPISODE = 0 | |
EPISODE_SCORE = 0 | |
Transition = namedtuple('Transition', | |
('state', 'action', 'next_state', 'reward')) | |
class ReplayMemory(object): | |
def __init__(self, capacity): | |
self.capacity = capacity | |
self.memory = [] | |
self.position = 0 | |
def push(self, *args): | |
"""Saves a transition.""" | |
if len(self.memory) < self.capacity: | |
self.memory.append(None) | |
self.memory[self.position] = Transition(*args) | |
self.position = (self.position + 1) % self.capacity | |
def sample(self, batch_size): | |
return random.sample(self.memory, batch_size) | |
def __len__(self): | |
return len(self.memory) | |
env = gym.make('Breakout-v0').unwrapped | |
def get_screen(): | |
screen = env.render(mode='rgb_array').transpose( | |
(2, 0, 1)) # transpose into torch order (CHW) | |
screen = screen[:,30:-15,:] | |
t = th.from_numpy(screen).type(FloatTensor).unsqueeze(0) | |
return t | |
# check the get_screen() and the visdom interface | |
game_win = vis.image(np.ones((3, 210, 160))) | |
plot_win = vis.line(X = np.array([EPISODE]), | |
Y = np.array([EPISODE_SCORE]), | |
opts=dict(showlegend=True, title='Break-out Log', legend=['Durations'])) | |
for i in range(2): | |
env.reset() | |
t = get_screen().cpu().squeeze(0).numpy() | |
vis.image(t, win=game_win) | |
time.sleep(1) | |
# Breakout based GLOBAL | |
print ('Game : Breakout') | |
print ('Action space :', env.action_space) | |
print ('Action\'s Meaning :', env.unwrapped.get_action_meanings()) | |
NO_OF_ACTIONS = len(env.unwrapped.get_action_meanings()) | |
class DQN(nn.Module): | |
def __init__(self): | |
super(DQN, self).__init__() | |
self.conv1 = nn.Conv2d(3, 32, kernel_size=5, stride=2) | |
self.bn1 = nn.BatchNorm2d(32) | |
self.conv2 = nn.Conv2d(32, 64, kernel_size=5, stride=2) | |
self.bn2 = nn.BatchNorm2d(64) | |
self.conv3 = nn.Conv2d(64, 128, kernel_size=5, stride=2) | |
self.bn3 = nn.BatchNorm2d(128) | |
self.conv4 = nn.Conv2d(128, 256, kernel_size=5, stride=2) | |
self.bn4 = nn.BatchNorm2d(256) | |
self.fc1 = nn.Linear(12544, 1024) | |
self.fc2 = nn.Linear(1024, 128) | |
self.head = nn.Linear(128, NO_OF_ACTIONS) | |
def forward(self, x): | |
x = F.relu(self.bn1(self.conv1(x))) | |
x = F.relu(self.bn2(self.conv2(x))) | |
x = F.relu(self.bn3(self.conv3(x))) | |
x = F.relu(self.bn4(self.conv4(x))) | |
x = F.relu(self.fc1(x.view(x.size(0), -1))) | |
x = F.relu(self.fc2(x)) | |
return self.head(x) | |
# Hyper parameters | |
BATCH_SIZE = 128 | |
GAMMA = 0.999 | |
EPS_START = 0.9 | |
EPS_END = 0.05 | |
EPS_DECAY = 200 | |
model = DQN() | |
if use_cuda: | |
model.cuda() | |
optimizer = optim.RMSprop(model.parameters()) | |
memory = ReplayMemory(10000) | |
steps_done = 0 | |
def select_action(state): | |
global steps_done | |
sample = random.random() | |
eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY) | |
steps_done += 1 | |
if sample > eps_threshold: | |
action = model(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1) | |
return action | |
else: | |
return LongTensor([[random.randrange(NO_OF_ACTIONS)]]) | |
episode_durations = [] | |
def plot_durations(): | |
_X = np.array([EPISODE]) | |
vis.line( | |
X= _X, | |
Y=np.array([EPISODE_SCORE]), | |
win= plot_win, | |
update='append' | |
) | |
last_sync = 0 | |
def optimize_model(): | |
global last_sync | |
if len(memory) < BATCH_SIZE: | |
return | |
transitions = memory.sample(BATCH_SIZE) | |
# Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for | |
# detailed explanation). | |
batch = Transition(*zip(*transitions)) | |
# Compute a mask of non-final states and concatenate the batch elements | |
non_final_mask = ByteTensor(tuple(map(lambda s: s is not None, | |
batch.next_state))) | |
# We don't want to backprop through the expected action values and volatile | |
# will save us on temporarily changing the model parameters' | |
# requires_grad to False! | |
non_final_next_states = Variable(th.cat([s for s in batch.next_state | |
if s is not None]), | |
volatile=True) | |
state_batch = Variable(th.cat(batch.state)) | |
action_batch = Variable(th.cat(batch.action)) | |
reward_batch = Variable(th.cat(batch.reward)) | |
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the | |
# columns of actions taken | |
state_action_values = model(state_batch).gather(1, action_batch) | |
# Compute V(s_{t+1}) for all next states. | |
next_state_values = Variable(th.zeros(BATCH_SIZE).type(Tensor)) | |
next_state_values[non_final_mask] = model(non_final_next_states).max(1)[0] | |
# Now, we don't want to mess up the loss with a volatile flag, so let's | |
# clear it. After this, we'll just end up with a Variable that has | |
# requires_grad=False | |
next_state_values.volatile = False | |
# Compute the expected Q values | |
expected_state_action_values = (next_state_values * GAMMA) + reward_batch | |
# Compute Huber loss | |
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values) | |
# Optimize the model | |
optimizer.zero_grad() | |
loss.backward() | |
for param in model.parameters(): | |
param.grad.data.clamp_(-1, 1) | |
optimizer.step() | |
num_episodes = 100000 | |
print ('Starting') | |
for i_episode in range(num_episodes): | |
EPISODE = i_episode | |
print (' Episode ', EPISODE) | |
# Initialize the environment and state | |
env.reset() | |
last_screen = get_screen() | |
current_screen = get_screen() | |
state = current_screen - last_screen | |
for t in count(): | |
# Select and perform an action | |
action = select_action(state) | |
_, reward, done, _ = env.step(action[0, 0]) | |
reward = Tensor([reward]) | |
# Observe new state | |
last_screen = current_screen | |
current_screen = get_screen() | |
if not done: | |
next_state = current_screen - last_screen | |
else: | |
next_state = None | |
# Store the transition in memory | |
memory.push(state, action, next_state, reward) | |
# Move to the next state | |
state = next_state | |
# Perform one step of the optimization (on the target network) | |
optimize_model() | |
if done: | |
episode_durations.append(t + 1) | |
EPISODE_SCORE = t + 1 | |
plot_durations() | |
break | |
print('Done') | |
env.render(close=True) | |
env.close() | |
# save the model | |
model.save_state_dict('~/breakout.pt') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment