Skip to content

Instantly share code, notes, and snippets.

@mzmmoazam
Created October 28, 2017 14:21
Show Gist options
  • Save mzmmoazam/1b6209e4ce3cc1b104f37091dd0bac89 to your computer and use it in GitHub Desktop.
Save mzmmoazam/1b6209e4ce3cc1b104f37091dd0bac89 to your computer and use it in GitHub Desktop.
used mlp and lstm on the openai's gym #cartpole games.
import gym
import random
import numpy as np
import tflearn
from tflearn.layers.core import input_data, dropout, fully_connected
from tflearn.data_utils import to_categorical
from tflearn.layers.estimator import regression
from statistics import median, mean
from collections import Counter
import glob
env = gym.make("CartPole-v0")
env.reset()
goal_steps = 500
score_requirement = 50
initial_games = 10000
def initial_population():
training_data = []
# all scores:
scores = []
# just the scores that met our threshold:
accepted_scores = []
# iterate through however many games we want:
for _ in range(initial_games):
score = 0
# moves specifically from this environment:
game_memory = []
# previous observation that we saw
prev_observation = []
for _ in range(goal_steps):
# while 1:
# choose random action (0 or 1)
action = random.randrange(0, 2)
# do it!
observation, reward, done, info = env.step(action)
print(reward)
# notice that the observation is returned FROM the action
# so we'll store the previous observation here, pairing
# the prev observation to the action we'll take.
if len(prev_observation) > 0:
game_memory.append([prev_observation, action])
prev_observation = observation
score += reward
if done: break
# IF our score is higher than our threshold, we'd like to save
# every move we made
# NOTE the reinforcement methodology here. (using gym for processing the reward)
# all we're doing is reinforcing the score, we're not trying
# to influence the machine in any way as to HOW that score is
# reached.
if score >= score_requirement:
accepted_scores.append(score)
for data in game_memory:
training_data.append([data[0], data[1]])
# reset env to play again
env.reset()
# save overall scores
scores.append(score)
# just in case you wanted to reference later
training_data_save = np.array(training_data)
np.save('initial_population.npy', training_data_save)
# some stats here, to further illustrate the neural network magic!
print('Average accepted score:', mean(accepted_scores))
print('Median score for accepted scores:', median(accepted_scores))
print(Counter(accepted_scores))
return training_data
class neural_network(object):
def __init__(self, training_data, algorithm="mlp", model=False):
if algorithm not in ["mlp", "rnn"]:
print("Error : The parameter should be mlp\t Multi Layer Perceptron or rnn\t Reccurent nn (LSTM)")
exit(1)
self.algorithm = algorithm
self.model = self.__preprocess(training_data=training_data, model=model)
def predict(self, data):
return self.model.predict(data)
def __preprocess(self, training_data, model):
X = np.array([i[0] for i in training_data]).reshape(-1, len(training_data[0][0]), 1)
print(X[0], X.shape)
y = np.array([i[1] for i in training_data])
print(y[0], y.shape)
if len(y.shape) != 2:
y = to_categorical(y, 2)
print(X[0], X.shape)
print(y[0], y.shape)
print(self.algorithm)
if not model:
if self.algorithm == 'rnn':
model = self.__lstm(input_size=len(X[0]))
else:
model = self.__mlp(input_size=len(X[0]))
model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True,
run_id='openai_learning' + self.algorithm)
return model
def __lstm(self, input_size):
net = input_data(shape=[None, input_size, 1], name='input')
net = tflearn.lstm(net, 128, return_seq=True)
net = tflearn.lstm(net, 128)
net = tflearn.fully_connected(net, 2, activation='softmax')
net = tflearn.regression(net, optimizer='adam',
loss='categorical_crossentropy', name="targets")
model = tflearn.DNN(net, tensorboard_verbose=3)
return model
def __mlp(self, input_size):
network = input_data(shape=[None, input_size, 1], name='input')
network = fully_connected(network, 128, activation='relu')
network = dropout(network, 0.8)
network = fully_connected(network, 256, activation='relu')
network = dropout(network, 0.8)
network = fully_connected(network, 512, activation='relu')
network = dropout(network, 0.8)
network = fully_connected(network, 256, activation='relu')
network = dropout(network, 0.8)
network = fully_connected(network, 128, activation='relu')
network = dropout(network, 0.8)
network = fully_connected(network, 2, activation='softmax')
network = regression(network, optimizer='adam', learning_rate=1e-3, loss='categorical_crossentropy',
name='targets')
model = tflearn.DNN(network, tensorboard_verbose=3)
return model
if __name__ == '__main__':
files = glob.glob('data.npy')
training_data = []
generation = 0
if files ==[] :
if glob.glob('initial_population.npy')== []:
model = neural_network(training_data=initial_population())
else:
model = neural_network(training_data=np.load('initial_population.npy'))
print('loaded stored one')
np.save('data.npy',np.load('initial_population.npy'))
generation = -1
else:
# generation = max([int(i.split('_')[1][0])for i in glob.glob('gen_?.npy')])
training_data = np.load('data.npy')
print('data.npy',training_data)
model = neural_network(training_data=training_data)
scores = []
choices = []
game_memory =[]
for each_game in range(20):
score = 0
game_memory = []
prev_obs = []
env.reset()
for _ in range(goal_steps):
env.render()
if len(prev_obs) == 0:
action = random.randrange(0, 2)
else:
action = np.argmax(model.predict(prev_obs.reshape(-1, len(prev_obs), 1))[0])
choices.append(action)
new_observation, reward, done, info = env.step(action)
prev_obs = new_observation
game_memory.append([new_observation, action])
score += reward
if done: break
scores.append(score)
if generation != -1 or training_data != []:
print(generation,'this value should not be -1')
game_memory = np.array(game_memory)
print(game_memory,generation,training_data)
print(training_data.shape,game_memory.shape)
np.save('data.npy', np.concatenate((training_data,game_memory)))
print('Average Score:', sum(scores) / len(scores))
print('choice 1:{} choice 0:{}'.format(choices.count(1) / len(choices), choices.count(0) / len(choices)))
print(score_requirement)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment