Created
August 20, 2016 10:50
-
-
Save wingedsheep/05aa545df4f8eec2ce4ab092ce115688 to your computer and use it in GitHub Desktop.
Deep Q learning with experience replay and target network
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import os | |
# os.environ["THEANO_FLAGS"] = "mode=FAST_RUN,device=gpu,floatX=float32" | |
# import theano | |
# import the neural net stuff | |
from keras.models import Sequential | |
from keras import optimizers | |
from keras.layers.core import Dense, Dropout, Activation, Flatten | |
from keras.layers.convolutional import Convolution1D | |
from keras.layers.normalization import BatchNormalization | |
from keras.layers.advanced_activations import LeakyReLU | |
from keras.regularizers import l2 | |
# import other stuff | |
import gym | |
import random | |
import numpy as np | |
from rl.utils.memory import Memory | |
defaultSettings = { | |
'memorySize' : 100000, | |
'discountFactor' : 0.975, | |
'learningRate' : 0.00025, | |
'hiddenLayers' : [30,30,30], | |
'bias' : True | |
} | |
defaultRunSettings = { | |
'updateTargetNetwork' : 10000, | |
'explorationRate' : 1, | |
'miniBatchSize' : 36, | |
'learnStart' : 36, | |
'renderPerXEpochs' : 1, | |
'shouldRender' : True, | |
'experimentId' : None, | |
'force' : True, | |
'upload' : False | |
} | |
class DeepQ: | |
def __init__( | |
self, | |
env, | |
memorySize = defaultSettings['memorySize'], | |
discountFactor = defaultSettings['discountFactor'], | |
learningRate = defaultSettings['learningRate'], | |
hiddenLayers = defaultSettings['hiddenLayers'], | |
bias = defaultSettings['bias']): | |
self.env = env | |
self.input_size = len(env.observation_space.high) | |
self.output_size = env.action_space.n | |
self.memory = Memory(memorySize) | |
self.discountFactor = discountFactor | |
self.learningRate = learningRate | |
self.hiddenLayers = hiddenLayers | |
self.bias = bias | |
self.initNetworks(hiddenLayers) | |
def run(self, | |
epochs, | |
steps, | |
api_key, | |
updateTargetNetwork = defaultRunSettings['updateTargetNetwork'], | |
explorationRate = defaultRunSettings['explorationRate'], | |
miniBatchSize = defaultRunSettings['miniBatchSize'], | |
learnStart = defaultRunSettings['learnStart'], | |
renderPerXEpochs = defaultRunSettings['renderPerXEpochs'], | |
shouldRender = defaultRunSettings['shouldRender'], | |
experimentId = defaultRunSettings['experimentId'], | |
force = defaultRunSettings['force'], | |
upload = defaultRunSettings['upload']): | |
last100Scores = [0] * 100 | |
last100ScoresIndex = 0 | |
last100Filled = False | |
stepCounter = 0 | |
if experimentId != None: | |
self.env.monitor.start('tmp/'+experimentId, force = force) | |
for epoch in xrange(epochs): | |
observation = self.env.reset() | |
print explorationRate | |
# number of timesteps | |
totalReward = 0 | |
for t in xrange(steps): | |
if epoch % renderPerXEpochs == 0 and shouldRender: | |
self.env.render() | |
qValues = self.getQValues(observation) | |
action = self.selectAction(qValues, explorationRate) | |
newObservation, reward, done, info = self.env.step(action) | |
totalReward += reward | |
self.addMemory(observation, action, reward, newObservation, done) | |
if stepCounter >= learnStart: | |
if stepCounter <= updateTargetNetwork: | |
self.learnOnMiniBatch(miniBatchSize, False) | |
else : | |
self.learnOnMiniBatch(miniBatchSize, True) | |
observation = newObservation | |
if done: | |
last100Scores[last100ScoresIndex] = totalReward | |
last100ScoresIndex += 1 | |
if last100ScoresIndex >= 100: | |
last100Filled = True | |
last100ScoresIndex = 0 | |
if not last100Filled: | |
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward | |
else : | |
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward," last 100 average: ",(sum(last100Scores)/len(last100Scores)) | |
break | |
stepCounter += 1 | |
if stepCounter % updateTargetNetwork == 0: | |
self.updateTargetNetwork() | |
print "updating target network" | |
explorationRate *= 0.995 | |
# explorationRate -= (2.0/epochs) | |
explorationRate = max (0.05, explorationRate) | |
self.env.monitor.close() | |
if upload: | |
gym.upload('/tmp/'+experimentId, api_key=api_key) | |
def initNetworks(self, hiddenLayers): | |
self.model = self.createRegularizedModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate, self.bias) | |
self.targetModel = self.createRegularizedModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate, self.bias) | |
def createRegularizedModel(self, inputs, outputs, hiddenLayers, activationType, learningRate, bias): | |
dropout = 0 | |
regularizationFactor = 0 | |
model = Sequential() | |
if len(hiddenLayers) == 0: | |
model.add(Dense(self.output_size, input_shape=(inputs,), init='lecun_uniform', bias=bias)) | |
model.add(Activation("linear")) | |
else : | |
if regularizationFactor > 0: | |
model.add(Dense(hiddenLayers[0], input_shape=(inputs,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias)) | |
else: | |
model.add(Dense(hiddenLayers[0], input_shape=(inputs,), init='lecun_uniform', bias=bias)) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
for index in range(1, len(hiddenLayers)-1): | |
layerSize = hiddenLayers[index] | |
if regularizationFactor > 0: | |
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias)) | |
else: | |
model.add(Dense(layerSize, init='lecun_uniform', bias=bias)) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
if dropout > 0: | |
model.add(Dropout(dropout)) | |
model.add(Dense(outputs, init='lecun_uniform', bias=bias)) | |
model.add(Activation("linear")) | |
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06) | |
model.compile(loss="mse", optimizer=optimizer) | |
return model | |
def printNetwork(self): | |
i = 0 | |
for layer in self.model.layers: | |
weights = layer.get_weights() | |
print "layer ",i,": ",weights | |
i += 1 | |
def backupNetwork(self, model, backup): | |
weights = model.get_weights() | |
backup.set_weights(weights) | |
def updateTargetNetwork(self): | |
self.backupNetwork(self.model, self.targetModel) | |
# predict Q values for all the actions | |
def getQValues(self, state): | |
predicted = self.model.predict(state.reshape(1,len(state))) | |
return predicted[0] | |
def getTargetQValues(self, state): | |
predicted = self.targetModel.predict(state.reshape(1,len(state))) | |
return predicted[0] | |
def getMaxQ(self, qValues): | |
return np.max(qValues) | |
def getMaxIndex(self, qValues): | |
return np.argmax(qValues) | |
# calculate the target function | |
def calculateTarget(self, qValuesNewState, reward, isFinal): | |
if isFinal: | |
return reward | |
else : | |
return reward + self.discountFactor * self.getMaxQ(qValuesNewState) | |
# select the action with the highest Q value | |
def selectAction(self, qValues, explorationRate): | |
rand = random.random() | |
if rand < explorationRate : | |
action = np.random.randint(0, self.output_size) | |
else : | |
action = self.getMaxIndex(qValues) | |
return action | |
def selectActionByProbability(self, qValues, bias): | |
qValueSum = 0 | |
shiftBy = 0 | |
for value in qValues: | |
if value + shiftBy < 0: | |
shiftBy = - (value + shiftBy) | |
shiftBy += 1e-06 | |
for value in qValues: | |
qValueSum += (value + shiftBy) ** bias | |
probabilitySum = 0 | |
qValueProbabilities = [] | |
for value in qValues: | |
probability = ((value + shiftBy) ** bias) / float(qValueSum) | |
qValueProbabilities.append(probability + probabilitySum) | |
probabilitySum += probability | |
qValueProbabilities[len(qValueProbabilities) - 1] = 1 | |
rand = random.random() | |
i = 0 | |
for value in qValueProbabilities: | |
if (rand <= value): | |
return i | |
i += 1 | |
def addMemory(self, state, action, reward, newState, isFinal): | |
self.memory.addMemory(state, action, reward, newState, isFinal) | |
def learnOnLastState(self): | |
if self.memory.getCurrentSize() >= 1: | |
return self.memory.getMemory(self.memory.getCurrentSize() - 1) | |
def learnOnMiniBatch(self, miniBatchSize, useTargetNetwork=True): | |
miniBatch = self.memory.getMiniBatch(miniBatchSize) | |
X_batch = np.empty((0,self.input_size), dtype = np.float64) | |
Y_batch = np.empty((0,self.output_size), dtype = np.float64) | |
for sample in miniBatch: | |
isFinal = sample['isFinal'] | |
state = sample['state'].copy() | |
action = sample['action'] | |
reward = sample['reward'] | |
newState = sample['newState'].copy() | |
qValues = self.getQValues(state) | |
if useTargetNetwork: | |
qValuesNewState = self.getTargetQValues(newState) | |
else : | |
qValuesNewState = self.getQValues(newState) | |
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal) | |
X_batch = np.append(X_batch, np.array([state]), axis=0) | |
Y_sample = qValues | |
Y_sample[action] = targetValue | |
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0) | |
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment