Last active
November 28, 2017 19:50
-
-
Save tawnkramer/b490519d743c1bbf1d95351f0c316c7d to your computer and use it in GitHub Desktop.
Script to train a keras model. Uses the data written by the donkey v2.2 tub writer, but faster training with proper sampling of distribution over tubs. You can drop this in your ~/d2 dir. Basic usage should feel familiar: python train.py --model models/mypilot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Scripts to train a keras model using tensorflow. | |
Uses the data written by the donkey v2.2 tub writer, | |
but faster training with proper sampling of distribution over tubs. | |
Has settings for continuous training that will look for new files as it trains. | |
Modify send_model_to_pi is you wish continuous training to update your pi as it builds. | |
You can drop this in your ~/d2 dir. | |
Basic usage should feel familiar: python train.py --model models/mypilot | |
You might need to do a: pip install scikit-learn | |
Credits: Derivative work from Alan Wells. | |
Usage: | |
train.py [--tub=<tub1,tub2,..tubn>] (--model=<model>) [--transfer=<model>] [--type=(linear|categorical|rnn|imu)] [--continuous] | |
Options: | |
-h --help Show this screen. | |
""" | |
import os | |
import glob | |
import random | |
import json | |
from threading import Lock | |
from docopt import docopt | |
import numpy as np | |
import keras | |
import donkeycar as dk | |
from donkeycar.parts.datastore import Tub | |
from donkeycar.parts.keras import KerasLinear, KerasIMU, KerasCategorical | |
import sklearn | |
from sklearn.model_selection import train_test_split | |
from sklearn.utils import shuffle | |
from PIL import Image | |
deterministic = False | |
if deterministic: | |
import tensorflow as tf | |
import random as rn | |
# The below is necessary in Python 3.2.3 onwards to | |
# have reproducible behavior for certain hash-based operations. | |
# See these references for further details: | |
# https://docs.python.org/3.4/using/cmdline.html#envvar-PYTHONHASHSEED | |
# https://github.com/fchollet/keras/issues/2280#issuecomment-306959926 | |
os.environ['PYTHONHASHSEED'] = '0' | |
# The below is necessary for starting Numpy generated random numbers | |
# in a well-defined initial state. | |
np.random.seed(42) | |
# The below is necessary for starting core Python generated random numbers | |
# in a well-defined state. | |
rn.seed(12345) | |
# Force TensorFlow to use single thread. | |
# Multiple threads are a potential source of | |
# non-reproducible results. | |
# For further details, see: https://stackoverflow.com/questions/42022950/which-seeds-have-to-be-set-where-to-realize-100-reproducibility-of-training-res | |
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1) | |
from keras import backend as K | |
# The below tf.set_random_seed() will make random number generation | |
# in the TensorFlow backend have a well-defined initial state. | |
# For further details, see: https://www.tensorflow.org/api_docs/python/tf/set_random_seed | |
tf.set_random_seed(1234) | |
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf) | |
K.set_session(sess) | |
''' | |
Tub management | |
''' | |
def expand_path_masks(paths): | |
''' | |
take a list of paths and expand any wildcards | |
returns a new list of paths fully expanded | |
''' | |
import glob | |
expanded_paths = [] | |
for path in paths: | |
if '*' in path or '?' in path: | |
mask_paths = glob.glob(path) | |
expanded_paths += mask_paths | |
else: | |
expanded_paths.append(path) | |
return expanded_paths | |
def gather_tub_paths(cfg, tub_names=None): | |
''' | |
takes as input the configuration, and the comma seperated list of tub paths | |
returns a list of Tub paths | |
''' | |
if tub_names: | |
tub_paths = [os.path.expanduser(n) for n in tub_names.split(',')] | |
return expand_path_masks(tub_paths) | |
else: | |
return [os.path.join(cfg.DATA_PATH, n) for n in os.listdir(cfg.DATA_PATH)] | |
def gather_tubs(cfg, tub_names): | |
''' | |
takes as input the configuration, and the comma seperated list of tub paths | |
returns a list of Tub objects initialized to each path | |
''' | |
tub_paths = gather_tub_paths(cfg, tub_names) | |
tubs = [Tub(p) for p in tub_paths] | |
return tubs | |
def get_image_index(fnm): | |
sl = os.path.basename(fnm).split('_') | |
return int(sl[0]) | |
def get_record_index(fnm): | |
sl = os.path.basename(fnm).split('_') | |
return int(sl[1].split('.')[0]) | |
def make_key(sample): | |
tub_path = sample['tub_path'] | |
index = sample['index'] | |
return tub_path + str(index) | |
def make_next_key(sample, index_offset): | |
tub_path = sample['tub_path'] | |
index = sample['index'] + index_offset | |
return tub_path + str(index) | |
def gather_records(cfg, tub_names, opts): | |
tubs = gather_tubs(cfg, tub_names) | |
records = [] | |
for tub in tubs: | |
record_paths = glob.glob(os.path.join(tub.path, 'record_*.json')) | |
record_paths.sort(key=get_record_index) | |
records += record_paths | |
return records | |
def collate_records(records, gen_records, opts): | |
for record_path in records: | |
basepath = os.path.dirname(record_path) | |
index = get_record_index(record_path) | |
sample = { 'tub_path' : basepath, "index" : index } | |
key = make_key(sample) | |
if key in gen_records: | |
continue | |
with open(record_path, 'r') as fp: | |
json_data = json.load(fp) | |
image_filename = json_data["cam/image_array"] | |
image_path = os.path.join(basepath, image_filename) | |
sample['record_path'] = record_path | |
sample["image_path"] = image_path | |
sample["json_data"] = json_data | |
angle = float(json_data['user/angle']) | |
throttle = float(json_data["user/throttle"]) | |
if opts['categorical']: | |
angle = dk.utils.linear_bin(angle) | |
sample['angle'] = angle | |
sample['throttle'] = throttle | |
try: | |
accl_x = float(json_data['imu/acl_x']) | |
accl_y = float(json_data['imu/acl_y']) | |
accl_z = float(json_data['imu/acl_z']) | |
gyro_x = float(json_data['imu/gyr_x']) | |
gyro_y = float(json_data['imu/gyr_y']) | |
gyro_z = float(json_data['imu/gyr_z']) | |
sample['imu_array'] = np.array([accl_x, accl_y, accl_z, gyro_x, gyro_y, gyro_z]) | |
except: | |
pass | |
sample['img_data'] = None | |
#now assign test or val | |
sample['train'] = (random.uniform(0., 1.0) > 0.2) | |
gen_records[key] = sample | |
class MyCPCallback(keras.callbacks.ModelCheckpoint): | |
''' | |
custom callback to interact with best val loss during continuous training | |
''' | |
def __init__(self, send_model_cb=None, *args, **kwargs): | |
super(MyCPCallback, self).__init__(*args, **kwargs) | |
self.reset_best_end_of_epoch = False | |
self.send_model_cb = send_model_cb | |
self.last_modified_time = None | |
def reset_best(self): | |
self.reset_best_end_of_epoch = True | |
def on_epoch_end(self, epoch, logs=None): | |
super(MyCPCallback, self).on_epoch_end(epoch, logs) | |
if self.send_model_cb: | |
''' | |
check whether the file changed and send to the pi | |
''' | |
filepath = self.filepath.format(epoch=epoch, **logs) | |
if os.path.exists(filepath): | |
last_modified_time = os.path.getmtime(filepath) | |
if self.last_modified_time is None or self.last_modified_time < last_modified_time: | |
self.last_modified_time = last_modified_time | |
self.send_model_cb(filepath) | |
''' | |
when reset best is set, we want to make sure to run an entire epoch | |
before setting our new best on the new total records | |
''' | |
if self.reset_best_end_of_epoch: | |
self.reset_best_end_of_epoch = False | |
self.best = np.Inf | |
def send_model_to_pi(model_filename): | |
#print('sending model to the pi') | |
#command = 'scp %s [email protected]:~/d2/models/contin_train.h5' % model_filename | |
#res = os.system(command) | |
#print("result:", res) | |
pass | |
def train(cfg, tub_names, model_name, transfer_model, model_type, continuous): | |
''' | |
use the specified data in tub_names to train an artifical neural network | |
saves the output trained model as model_name | |
''' | |
verbose = True | |
#when transfering models, should we freeze all but the last N layers? | |
freeze_weights = True | |
N_layers_to_train = 7 | |
if continuous: | |
print("continuous training") | |
gen_records = {} | |
opts = {} | |
opts['categorical'] = False | |
if model_type is None: | |
model_type = "categorical" | |
if model_type == "imu": | |
kl = KerasIMU() | |
elif model_type == "rnn": | |
raise Exception("Not yet") | |
#kl = KerasRNN_LSTM() | |
elif model_type == "linear": | |
kl = KerasLinear(num_outputs=2) | |
elif model_type == "categorical": | |
kl = KerasCategorical() | |
opts['categorical'] = True | |
else: | |
raise Exception("unknown model type: %s" % model_type) | |
print('training with model type', model_type) | |
if transfer_model: | |
print('loading weights from model', transfer_model) | |
kl.load(transfer_model) | |
if freeze_weights: | |
num_to_freeze = len(kl.model.layers) - N_layers_to_train | |
print('freezing %d layers' % num_to_freeze) | |
for i in range(num_to_freeze): | |
kl.model.layers[i].trainable = False | |
kl.model.compile(optimizer='rmsprop', loss='mse') | |
print(kl.model.summary()) | |
opts['keras_pilot'] = kl | |
opts['continuous'] = continuous | |
records = gather_records(cfg, tub_names, opts) | |
print('collating sequences') | |
collate_records(records, gen_records, opts) | |
def generator(save_best, opts, data, batch_size, isTrainSet=True): | |
num_records = len(data) | |
while True: | |
if isTrainSet and opts['continuous']: | |
''' | |
When continuous training, we look for new records after each epoch. | |
This will add new records to the train and validation set. | |
''' | |
records = gather_records(cfg, tub_names, opts) | |
if len(records) > num_records: | |
collate_records(records, gen_records, opts) | |
new_num_rec = len(data) | |
if new_num_rec > num_records: | |
print('picked up', new_num_rec - num_records, 'new records!') | |
num_records = new_num_rec | |
save_best.reset_best() | |
batch_data = [] | |
keys = list(data.keys()) | |
shuffle(keys) | |
kl = opts['keras_pilot'] | |
if type(kl.model.output) is list: | |
model_out_shape = (2, 1) | |
else: | |
model_out_shape = kl.model.output.shape | |
if type(kl.model.input) is list: | |
model_in_shape = (2, 1) | |
else: | |
model_in_shape = kl.model.input.shape | |
has_imu = model_in_shape[1] == 1 | |
for key in keys: | |
if not key in data: | |
continue | |
_record = data[key] | |
if _record['train'] != isTrainSet: | |
continue | |
batch_data.append(_record) | |
if len(batch_data) == batch_size: | |
inputs_img = [] | |
inputs_imu = [] | |
angles = [] | |
throttles = [] | |
for record in batch_data: | |
#get image data if we don't already have it | |
if record['img_data'] is None: | |
record['img_data'] = np.array(Image.open(record['image_path'])) | |
if has_imu: | |
inputs_imu.append(record['imu_array']) | |
inputs_img.append(record['img_data']) | |
angles.append(record['angle']) | |
throttles.append(record['throttle']) | |
if has_imu: | |
X = [np.array(inputs_img), np.array(inputs_imu)] | |
else: | |
X = [np.array(inputs_img)] | |
if model_out_shape[1] == 2: | |
y = [np.array([angles, throttles])] | |
else: | |
y = [np.array(angles), np.array(throttles)] | |
yield X, y | |
batch_data = [] | |
model_path = os.path.expanduser(model_name) | |
#checkpoint to save model after each epoch and send best to the pi. | |
save_best = MyCPCallback(send_model_cb=send_model_to_pi, | |
filepath=model_path, | |
monitor='val_loss', | |
verbose=verbose, | |
save_best_only=True, | |
mode='min') | |
train_gen = generator(save_best, opts, gen_records, cfg.BATCH_SIZE, True) | |
val_gen = generator(save_best, opts, gen_records, cfg.BATCH_SIZE, False) | |
total_records = len(gen_records) | |
num_train = 0 | |
num_val = 0 | |
for key, _record in gen_records.items(): | |
if _record['train'] == True: | |
num_train += 1 | |
else: | |
num_val += 1 | |
print("train: %d, val: %d" % (num_train, num_val)) | |
print('total records: %d' %(total_records)) | |
if not continuous: | |
steps_per_epoch = num_train // cfg.BATCH_SIZE | |
else: | |
steps_per_epoch = 100 | |
val_steps = 10 | |
print('steps_per_epoch', steps_per_epoch) | |
if continuous: | |
epochs = 100000 | |
else: | |
epochs = 200 | |
workers_count = 1 | |
use_multiprocessing = False | |
callbacks_list = [save_best] | |
#if use_early_stop: | |
# callbacks_list.append(early_stop) | |
hist = kl.model.fit_generator( | |
train_gen, | |
steps_per_epoch=steps_per_epoch, | |
epochs=epochs, | |
verbose=1, | |
validation_data=val_gen, | |
callbacks=callbacks_list, | |
validation_steps=val_steps, | |
workers=workers_count, | |
use_multiprocessing=use_multiprocessing) | |
if __name__ == "__main__": | |
args = docopt(__doc__) | |
cfg = dk.load_config() | |
tub = args['--tub'] | |
model = args['--model'] | |
transfer = args['--transfer'] | |
model_type = args['--type'] | |
continuous = args['--continuous'] | |
train(cfg, tub, model, transfer, model_type, continuous) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment