Created
October 28, 2017 14:21
-
-
Save mzmmoazam/1b6209e4ce3cc1b104f37091dd0bac89 to your computer and use it in GitHub Desktop.
used mlp and lstm on the openai's gym #cartpole games.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import random | |
import numpy as np | |
import tflearn | |
from tflearn.layers.core import input_data, dropout, fully_connected | |
from tflearn.data_utils import to_categorical | |
from tflearn.layers.estimator import regression | |
from statistics import median, mean | |
from collections import Counter | |
import glob | |
env = gym.make("CartPole-v0") | |
env.reset() | |
goal_steps = 500 | |
score_requirement = 50 | |
initial_games = 10000 | |
def initial_population(): | |
training_data = [] | |
# all scores: | |
scores = [] | |
# just the scores that met our threshold: | |
accepted_scores = [] | |
# iterate through however many games we want: | |
for _ in range(initial_games): | |
score = 0 | |
# moves specifically from this environment: | |
game_memory = [] | |
# previous observation that we saw | |
prev_observation = [] | |
for _ in range(goal_steps): | |
# while 1: | |
# choose random action (0 or 1) | |
action = random.randrange(0, 2) | |
# do it! | |
observation, reward, done, info = env.step(action) | |
print(reward) | |
# notice that the observation is returned FROM the action | |
# so we'll store the previous observation here, pairing | |
# the prev observation to the action we'll take. | |
if len(prev_observation) > 0: | |
game_memory.append([prev_observation, action]) | |
prev_observation = observation | |
score += reward | |
if done: break | |
# IF our score is higher than our threshold, we'd like to save | |
# every move we made | |
# NOTE the reinforcement methodology here. (using gym for processing the reward) | |
# all we're doing is reinforcing the score, we're not trying | |
# to influence the machine in any way as to HOW that score is | |
# reached. | |
if score >= score_requirement: | |
accepted_scores.append(score) | |
for data in game_memory: | |
training_data.append([data[0], data[1]]) | |
# reset env to play again | |
env.reset() | |
# save overall scores | |
scores.append(score) | |
# just in case you wanted to reference later | |
training_data_save = np.array(training_data) | |
np.save('initial_population.npy', training_data_save) | |
# some stats here, to further illustrate the neural network magic! | |
print('Average accepted score:', mean(accepted_scores)) | |
print('Median score for accepted scores:', median(accepted_scores)) | |
print(Counter(accepted_scores)) | |
return training_data | |
class neural_network(object): | |
def __init__(self, training_data, algorithm="mlp", model=False): | |
if algorithm not in ["mlp", "rnn"]: | |
print("Error : The parameter should be mlp\t Multi Layer Perceptron or rnn\t Reccurent nn (LSTM)") | |
exit(1) | |
self.algorithm = algorithm | |
self.model = self.__preprocess(training_data=training_data, model=model) | |
def predict(self, data): | |
return self.model.predict(data) | |
def __preprocess(self, training_data, model): | |
X = np.array([i[0] for i in training_data]).reshape(-1, len(training_data[0][0]), 1) | |
print(X[0], X.shape) | |
y = np.array([i[1] for i in training_data]) | |
print(y[0], y.shape) | |
if len(y.shape) != 2: | |
y = to_categorical(y, 2) | |
print(X[0], X.shape) | |
print(y[0], y.shape) | |
print(self.algorithm) | |
if not model: | |
if self.algorithm == 'rnn': | |
model = self.__lstm(input_size=len(X[0])) | |
else: | |
model = self.__mlp(input_size=len(X[0])) | |
model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, | |
run_id='openai_learning' + self.algorithm) | |
return model | |
def __lstm(self, input_size): | |
net = input_data(shape=[None, input_size, 1], name='input') | |
net = tflearn.lstm(net, 128, return_seq=True) | |
net = tflearn.lstm(net, 128) | |
net = tflearn.fully_connected(net, 2, activation='softmax') | |
net = tflearn.regression(net, optimizer='adam', | |
loss='categorical_crossentropy', name="targets") | |
model = tflearn.DNN(net, tensorboard_verbose=3) | |
return model | |
def __mlp(self, input_size): | |
network = input_data(shape=[None, input_size, 1], name='input') | |
network = fully_connected(network, 128, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 256, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 512, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 256, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 128, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 2, activation='softmax') | |
network = regression(network, optimizer='adam', learning_rate=1e-3, loss='categorical_crossentropy', | |
name='targets') | |
model = tflearn.DNN(network, tensorboard_verbose=3) | |
return model | |
if __name__ == '__main__': | |
files = glob.glob('data.npy') | |
training_data = [] | |
generation = 0 | |
if files ==[] : | |
if glob.glob('initial_population.npy')== []: | |
model = neural_network(training_data=initial_population()) | |
else: | |
model = neural_network(training_data=np.load('initial_population.npy')) | |
print('loaded stored one') | |
np.save('data.npy',np.load('initial_population.npy')) | |
generation = -1 | |
else: | |
# generation = max([int(i.split('_')[1][0])for i in glob.glob('gen_?.npy')]) | |
training_data = np.load('data.npy') | |
print('data.npy',training_data) | |
model = neural_network(training_data=training_data) | |
scores = [] | |
choices = [] | |
game_memory =[] | |
for each_game in range(20): | |
score = 0 | |
game_memory = [] | |
prev_obs = [] | |
env.reset() | |
for _ in range(goal_steps): | |
env.render() | |
if len(prev_obs) == 0: | |
action = random.randrange(0, 2) | |
else: | |
action = np.argmax(model.predict(prev_obs.reshape(-1, len(prev_obs), 1))[0]) | |
choices.append(action) | |
new_observation, reward, done, info = env.step(action) | |
prev_obs = new_observation | |
game_memory.append([new_observation, action]) | |
score += reward | |
if done: break | |
scores.append(score) | |
if generation != -1 or training_data != []: | |
print(generation,'this value should not be -1') | |
game_memory = np.array(game_memory) | |
print(game_memory,generation,training_data) | |
print(training_data.shape,game_memory.shape) | |
np.save('data.npy', np.concatenate((training_data,game_memory))) | |
print('Average Score:', sum(scores) / len(scores)) | |
print('choice 1:{} choice 0:{}'.format(choices.count(1) / len(choices), choices.count(0) / len(choices))) | |
print(score_requirement) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment