Created
May 22, 2016 08:36
-
-
Save wingedsheep/f162f73aad0dcfb2470a89ac0781696f to your computer and use it in GitHub Desktop.
Deep Q learning for the lunar lander
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import the neural net stuff | |
from keras.models import Sequential | |
from keras import optimizers | |
from keras.layers.core import Dense, Dropout, Activation | |
from keras.layers.normalization import BatchNormalization | |
from keras.layers.advanced_activations import LeakyReLU | |
from keras.regularizers import l2 | |
# import other stuff | |
import random | |
import numpy as np | |
from memory import Memory | |
class DeepQ: | |
def __init__(self, inputs, outputs, memorySize, discountFactor, learningRate, learnStart): | |
self.input_size = inputs | |
self.output_size = outputs | |
self.memory = Memory(memorySize) | |
self.discountFactor = discountFactor | |
self.learnStart = learnStart | |
self.learningRate = learningRate | |
def initNetworks(self, hiddenLayers): | |
model = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate) | |
self.model = model | |
targetModel = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate) | |
self.targetModel = targetModel | |
def createRegularizedModel(self, inputs, outputs, hiddenLayers, activationType, learningRate): | |
bias = True | |
dropout = 0 | |
regularizationFactor = 0.01 | |
model = Sequential() | |
if len(hiddenLayers) == 0: | |
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform', bias=bias)) | |
model.add(Activation("linear")) | |
else : | |
if regularizationFactor > 0: | |
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias)) | |
else: | |
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', bias=bias)) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
for index in range(1, len(hiddenLayers)-1): | |
layerSize = hiddenLayers[index] | |
if regularizationFactor > 0: | |
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias)) | |
else: | |
model.add(Dense(layerSize, init='lecun_uniform', bias=bias)) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
if dropout > 0: | |
model.add(Dropout(dropout)) | |
model.add(Dense(self.output_size, init='lecun_uniform', bias=bias)) | |
model.add(Activation("linear")) | |
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06) | |
model.compile(loss="mse", optimizer=optimizer) | |
return model | |
def createModel(self, inputs, outputs, hiddenLayers, activationType, learningRate): | |
model = Sequential() | |
if len(hiddenLayers) == 0: | |
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform')) | |
model.add(Activation("linear")) | |
else : | |
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform')) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
for index in range(1, len(hiddenLayers)-1): | |
layerSize = hiddenLayers[index] | |
model.add(Dense(layerSize, init='lecun_uniform')) | |
if (activationType == "LeakyReLU") : | |
model.add(LeakyReLU(alpha=0.01)) | |
else : | |
model.add(Activation(activationType)) | |
model.add(Dense(self.output_size, init='lecun_uniform')) | |
model.add(Activation("linear")) | |
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06) | |
model.compile(loss="mse", optimizer=optimizer) | |
return model | |
def printNetwork(self): | |
i = 0 | |
for layer in self.model.layers: | |
weights = layer.get_weights() | |
print "layer ",i,": ",weights | |
i += 1 | |
def backupNetwork(self, model, backup): | |
weightMatrix = [] | |
for layer in model.layers: | |
weights = layer.get_weights() | |
weightMatrix.append(weights) | |
i = 0 | |
for layer in backup.layers: | |
weights = weightMatrix[i] | |
layer.set_weights(weights) | |
i += 1 | |
def updateTargetNetwork(self): | |
self.backupNetwork(self.model, self.targetModel) | |
# predict Q values for all the actions | |
def getQValues(self, state): | |
predicted = self.model.predict(state.reshape(1,len(state))) | |
return predicted[0] | |
def getTargetQValues(self, state): | |
predicted = self.targetModel.predict(state.reshape(1,len(state))) | |
return predicted[0] | |
def getMaxQ(self, qValues): | |
return np.max(qValues) | |
def getMaxIndex(self, qValues): | |
return np.argmax(qValues) | |
# calculate the target function | |
def calculateTarget(self, qValuesNewState, reward, isFinal): | |
if isFinal: | |
return reward | |
else : | |
return reward + self.discountFactor * self.getMaxQ(qValuesNewState) | |
# select the action with the highest Q value | |
def selectAction(self, qValues, explorationRate): | |
rand = random.random() | |
if rand < explorationRate : | |
action = np.random.randint(0, self.output_size) | |
else : | |
action = self.getMaxIndex(qValues) | |
return action | |
def selectActionByProbability(self, qValues, bias): | |
qValueSum = 0 | |
shiftBy = 0 | |
for value in qValues: | |
if value + shiftBy < 0: | |
shiftBy = - (value + shiftBy) | |
shiftBy += 1e-06 | |
for value in qValues: | |
qValueSum += (value + shiftBy) ** bias | |
probabilitySum = 0 | |
qValueProbabilities = [] | |
for value in qValues: | |
probability = ((value + shiftBy) ** bias) / float(qValueSum) | |
qValueProbabilities.append(probability + probabilitySum) | |
probabilitySum += probability | |
qValueProbabilities[len(qValueProbabilities) - 1] = 1 | |
rand = random.random() | |
i = 0 | |
for value in qValueProbabilities: | |
if (rand <= value): | |
return i | |
i += 1 | |
def addMemory(self, state, action, reward, newState, isFinal): | |
self.memory.addMemory(state, action, reward, newState, isFinal) | |
def learnOnLastState(self): | |
if self.memory.getCurrentSize() >= 1: | |
return self.memory.getMemory(self.memory.getCurrentSize() - 1) | |
def learnOnMiniBatch(self, miniBatchSize, useTargetNetwork=True): | |
if self.memory.getCurrentSize() > self.learnStart : | |
miniBatch = self.memory.getMiniBatch(miniBatchSize) | |
X_batch = np.empty((0,self.input_size), dtype = np.float64) | |
Y_batch = np.empty((0,self.output_size), dtype = np.float64) | |
for sample in miniBatch: | |
isFinal = sample['isFinal'] | |
state = sample['state'] | |
action = sample['action'] | |
reward = sample['reward'] | |
newState = sample['newState'] | |
qValues = self.getQValues(state) | |
if useTargetNetwork: | |
qValuesNewState = self.getTargetQValues(newState) | |
else : | |
qValuesNewState = self.getQValues(newState) | |
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal) | |
X_batch = np.append(X_batch, np.array([state.copy()]), axis=0) | |
Y_sample = qValues.copy() | |
Y_sample[action] = targetValue | |
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0) | |
if isFinal: | |
X_batch = np.append(X_batch, np.array([newState.copy()]), axis=0) | |
Y_batch = np.append(Y_batch, np.array([[reward]*self.output_size]), axis=0) | |
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import the gym stuff | |
import gym | |
# import other stuff | |
import random | |
import numpy as np | |
# import own classes | |
from deepq import DeepQ | |
print(gym.envs.registry.all()) | |
env = gym.make('LunarLander-v1') | |
epochs = 1000 | |
steps = 1000 | |
updateTargetNetwork = 10000 | |
explorationRate = 1 | |
minibatch_size = 128 | |
learnStart = 128 | |
learningRate = 0.00025 | |
discountFactor = 0.99 | |
memorySize = 1000000 | |
last100Scores = [0] * 100 | |
last100ScoresIndex = 0 | |
last100Filled = False | |
renderPerXEpochs = 50 | |
shouldRender = False | |
deepQ = DeepQ(len(env.observation_space.high), env.action_space.n, memorySize, discountFactor, learningRate, learnStart) | |
deepQ.initNetworks([30,30,30]) | |
stepCounter = 0 | |
# number of reruns | |
for epoch in xrange(epochs): | |
observation = env.reset() | |
print explorationRate | |
# number of timesteps | |
totalReward = 0 | |
for t in xrange(steps): | |
if epoch % renderPerXEpochs == 0 and shouldRender: | |
env.render() | |
qValues = deepQ.getQValues(observation) | |
action = deepQ.selectAction(qValues, explorationRate) | |
newObservation, reward, done, info = env.step(action) | |
totalReward += reward | |
deepQ.addMemory(observation, action, reward, newObservation, done) | |
if stepCounter >= learnStart: | |
if stepCounter <= updateTargetNetwork: | |
deepQ.learnOnMiniBatch(minibatch_size, False) | |
else : | |
deepQ.learnOnMiniBatch(minibatch_size, True) | |
observation = newObservation | |
if done: | |
last100Scores[last100ScoresIndex] = totalReward | |
last100ScoresIndex += 1 | |
if last100ScoresIndex >= 100: | |
last100Filled = True | |
last100ScoresIndex = 0 | |
if not last100Filled: | |
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward | |
else : | |
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward," last 100 average: ",(sum(last100Scores)/len(last100Scores)) | |
break | |
stepCounter += 1 | |
if stepCounter % updateTargetNetwork == 0: | |
deepQ.updateTargetNetwork() | |
print "updating target network" | |
explorationRate *= 0.995 | |
explorationRate = max (0.05, explorationRate) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import random | |
class Memory: | |
def __init__(self, size): | |
self.size = size | |
self.currentPosition = 0 | |
self.states = [] | |
self.actions = [] | |
self.rewards = [] | |
self.newStates = [] | |
self.finals = [] | |
def getMiniBatch(self, size) : | |
indices = random.sample(np.arange(len(self.states)), min(size,len(self.states)) ) | |
miniBatch = [] | |
for index in indices: | |
miniBatch.append({'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]}) | |
return miniBatch | |
def getCurrentSize(self) : | |
return len(self.states) | |
def getMemory(self, index): | |
return {'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]} | |
def addMemory(self, state, action, reward, newState, isFinal) : | |
if (self.currentPosition >= self.size - 1) : | |
self.currentPosition = 0 | |
if (len(self.states) > self.size) : | |
self.states[self.currentPosition] = state | |
self.actions[self.currentPosition] = action | |
self.rewards[self.currentPosition] = reward | |
self.newStates[self.currentPosition] = newState | |
self.finals[self.currentPosition] = isFinal | |
else : | |
self.states.append(state) | |
self.actions.append(action) | |
self.rewards.append(reward) | |
self.newStates.append(newState) | |
self.finals.append(isFinal) | |
self.currentPosition += 1 |
I would give double and deuling DQN a try. They are very easy to implement. I am also having trouble getting mine to work, even with those 2 improvements. I even added prioritized replay. I believe hyperparameter tuning is the reason why yours and mine are not working as well as we expect.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I used the same Q learning algorithm as I used to solve the cartpole problem. (https://gym.openai.com/evaluations/eval_nI8cryNQaKlFKv592N7hQ)
The only thing is it doesn't seem to learn how to land but how to hover in a steady position. There is still some work left.