Last active
July 25, 2017 19:55
-
-
Save dcalacci/ab430d67cd39750f4ea082a418b6b5e2 to your computer and use it in GitHub Desktop.
Simple Neural Network in Tensorflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Authors: Dan Calacci | |
## Adapted from Natashe Jaque's NN code for PML '17 | |
## intended to be used with a larger framework for deep learning | |
import tensorflow as tf | |
import numpy as np | |
import math | |
import json | |
import matplotlib.pyplot as plt | |
# local | |
import data_funcs | |
class NeuralNet: | |
def __init__(self, data_file, params): | |
"""Initialize neural network | |
:param data_file: string filepath of data file to train, validate, and test on | |
:param params: either a string filepath of the JSON params file, or a | |
dictionary of parameters. | |
:returns: new NeuralNet object | |
:rtype: NeuralNet | |
""" | |
if params is None: | |
raise ValueError("No params passed to constructor. You must give either a JSON-formatted params\ | |
file or a dictionary on network construction.") | |
elif type(params) == str: | |
# load param file | |
with open(params) as json_params: | |
self.params = json.load(json_params) | |
elif type(params) == dict: | |
self.params = params | |
else: | |
raise ValueError("params must be a dict or a filepath string.") | |
self.data_file = data_file | |
self.is_classification_p = self.params['model_type'] == 'classification' | |
# not loaded in param file | |
self.optimizer = tf.train.AdamOptimizer | |
# extract data from data_file | |
self.load_data() | |
# initialize graph with params from param_file | |
self.initialize_graph() | |
def load_data(self): | |
"""Load data from data_file | |
:returns: None | |
:rtype: NoneType | |
""" | |
# Extract the data from the filename | |
self.data_loader = data_funcs.DataLoader(self.data_file) | |
self.input_size = self.data_loader.get_feature_size() | |
if self.params['model_type'] == 'classification': | |
print("\nPerforming classification.") | |
self.output_size = self.data_loader.num_classes | |
self.metric_name = 'accuracy' | |
else: | |
print("\nPerforming regression.") | |
self.output_size = self.data_loader.num_outputs | |
self.metric_name = 'RMSE' | |
print("Input dimensions (number of features):", self.input_size) | |
print("Number of classes/outputs:", self.output_size) | |
def initialize_graph(self): | |
"""Initialize computation graph, tensorflow session, and metric arrays. | |
:returns: None | |
:rtype: NoneType | |
""" | |
# Set up tensorflow computation graph. | |
self.graph = tf.Graph() | |
self.build_graph() | |
# Set up and initialize tensorflow session. | |
self.session = tf.Session(graph=self.graph) | |
self.session.run(self.init) | |
# Use for plotting evaluation. | |
self.train_metrics = [] | |
self.val_metrics = [] | |
########################################################################## | |
# Initializing Network Weights | |
########################################################################## | |
def _layer_input_size(self, layer_idx): | |
"""Input size for the given layer. | |
An index -1 is considered the start layer of the network. | |
:param layer_idx: the layer index to return the input size for | |
:returns: The input size of layer `layer_idx` | |
:rtype: int | |
""" | |
if layer_idx == -1: | |
return self.input_size | |
return self.params['layer_sizes'][layer_idx] | |
def _layer_output_size(self, layer_idx): | |
"""Output size for the given layer | |
Output size of last layer is always the size of the output of the | |
network. | |
:param layer_idx: the layer index to return the output size for | |
:returns: The size of the output of layer at index `layer_idx` | |
:rtype: int | |
""" | |
if layer_idx == len(self.params['layer_sizes']) - 1: | |
return self.output_size | |
return self.params['layer_sizes'][layer_idx + 1] | |
def _initial_weight_and_size(self, layer_idx): | |
"""Initial weight for the given layer index | |
The weight returned is a tensorflow variable. | |
:param layer_idx: the layer to return the weight and size for | |
:returns: A tuple of (input_size, output_size, weight) | |
:rtype: tuple | |
""" | |
input_size, output_size = (self._layer_input_size(layer_idx), | |
self._layer_output_size(layer_idx)) | |
return (input_size, | |
output_size, | |
self._weight_variable([input_size, output_size], | |
'weights_{}'.format(str(layer_idx)))) | |
def _initial_bias(self, layer_idx): | |
"""The initial bias for the layer at the given index. | |
:param layer_idx: Layer index to return the bias for. | |
:returns: A tensorflow constant that represents the bias for this layer | |
:rtype: tf.constant | |
""" | |
output_size = self._layer_output_size(layer_idx) | |
return self._bias_variable([output_size], | |
'biases_{}'.format(str(layer_idx))) | |
"""Initializes a tensorflow weight variable with random | |
values centered around 0. | |
shape: shape of the weight variable (?) | |
name: name of the variable | |
""" | |
def _weight_variable(self, shape, name): | |
"""Creates a tensorflow weight variable with the given shape and name. | |
The weight variable returned has random values, centered around 0. | |
Shape should be list of [input_size, output_size] for a layer. | |
:param shape: the shape of the layer to create a weight variable for | |
:param name: the name to give the variable | |
:returns: a tensorflow truncated_normal variable | |
:rtype: tf.Variable | |
""" | |
std = 1.0 / math.sqrt(float(shape[0])) | |
initial = tf.truncated_normal(shape, stddev=std, dtype=tf.float64) | |
return tf.Variable(initial, name=name) | |
def _bias_variable(self, shape, name): | |
"""Initializes a tensorflow bias variable to a small constant value for a given | |
shape and name. | |
Initializes bias to a value of 0.1 for the layer. | |
:param shape: the shape of the layer to create a bias variable for. | |
:param name: the name to give the variable | |
:returns: a tensorflow constant variable | |
:rtype: tf.Variable | |
""" | |
initial = tf.constant(0.1, shape=shape, dtype=tf.float64) | |
return tf.Variable(initial, name=name) | |
def initialize_weights(self): | |
"""Constructs tensorflow variables for the weights and biases in each layer of | |
the graph. | |
The number of layers, and the sizes of each layer, are defined in the | |
`layer_sizes` field passed to the object on construction. | |
Creates variables self.weights and self.biases, which are arrays that | |
contain the weights and biases for each layer of the network. | |
:returns: None | |
:rtype: NoneType | |
""" | |
# include -1 as the start layer | |
self.weights, self.biases = [], [] | |
layer_indices = [-1] + range(len(self.params['layer_sizes'])) | |
weights_and_sizes = [self._initial_weight_and_size(idx) | |
for idx in layer_indices] | |
input_sizes, output_sizes, self.weights = zip(*weights_and_sizes) | |
self.biases = [self._initial_bias(idx) for idx in layer_indices] | |
print("Okay, making a neural net with the following structure:") | |
print(["{}x{} {}".format(i, o, o) for i, o | |
in zip(input_sizes, output_sizes)]) | |
########################################################################## | |
# Building Graph | |
########################################################################## | |
def _activation_function(self, h): | |
"""Returns the activation function for this network. | |
:param h: The hidden layer to apply the activation function to | |
:returns: the application of this network's activation function to h | |
:rtype: Tensor with the same type as h | |
""" | |
if self.params['activation_func'] == 'relu': | |
return tf.nn.relu(h) | |
return tf.nn.sigmoid(h) | |
def _run_network(self, input_X): | |
"""Runs the network for each layer in self.weights on the given input | |
Runs our network. Applies our learned weights at each layer in the | |
network, adds biases, and applies our activation function + dropout. | |
the type of input_X and the output of _run_network is the same as | |
the initial placeholder for self.tf_X | |
:param input_X: The input to run the network on, a tf.float64 | |
:returns: The output of the final layer of our network. | |
:rtype: tf.float64 | |
""" | |
hidden = input_X | |
def not_final_layer_p(n): return n == len(self.weights) - 1 | |
for n, w in enumerate(self.weights): | |
# invoke layer context | |
with tf.name_scope('layer{}'.format(n)) as scope: | |
# simple fully connected layer | |
hidden = tf.matmul(hidden, w) + self.biases[n] | |
if not_final_layer_p(n): | |
hidden = self._activation_function(hidden) | |
hidden = tf.nn.dropout(hidden, | |
self.tf_dropout_prob) | |
return hidden | |
def _configure_common(self): | |
"""Configures the initial output, input, and dropout tensors | |
Creates placeholder tensors for tf_X, tf_Y, and tf_dropout_prob | |
For info on placeholders, see: | |
https://www.tensorflow.org/versions/r0.11/api_docs/python/io_ops/placeholders | |
Depends on model_type. For classification, tf_Y will be a tf.int64. For | |
regression, tf.float64. | |
:returns: None | |
:rtype: NoneType | |
""" | |
# output, float for regression, int for classification | |
y_type = tf.int64 if self.is_classification_p else tf.float64 | |
self.tf_Y = tf.placeholder(y_type, name="Y") | |
# input, always floats for now | |
self.tf_X = tf.placeholder(tf.float64, name="X") | |
# droput probabilities for nodes | |
self.tf_dropout_prob = tf.placeholder(tf.float64) | |
def _configure_classification(self): | |
"""Configure network for classification | |
Sets up our loss function, weight regularization, predictions & | |
accuracy for the classification regime. | |
loss function: softmax cross entropy | |
regularization: l2 | |
:returns: None | |
:rtype: NoneType | |
""" | |
# Apply a softmax function to get probabilities, train this dist | |
# against targets with cross entropy loss. | |
loss_func = tf.nn.sparse_softmax_cross_entropy_with_logits | |
self.loss = tf.reduce_mean(loss_func(logits=self.logits, | |
labels=self.tf_Y)) | |
# Add weight decay regularization term to loss | |
weight_reg = sum([tf.nn.l2_loss(w) for w in self.weights]) | |
self.loss += self.params['weight_penalty'] * weight_reg | |
# Code for making predictions and evaluating them. | |
self.class_probabilities = tf.nn.softmax(self.logits) | |
self.predictions = tf.argmax(self.class_probabilities, axis=1) | |
self.correct_prediction = tf.equal(self.predictions, self.tf_Y) | |
self.accuracy = tf.reduce_mean(tf.cast(self.correct_prediction, | |
tf.float32)) | |
def _configure_regression(self): | |
"""Configures network for regression. | |
Sets up our loss & weight regularization. | |
loss: rmse | |
regularization: l2 | |
:returns: None | |
:rtype: NoneType | |
""" | |
# Apply mean squared error loss. | |
errs = tf.subtract(tf.reshape(self.logits, [-1]), | |
self.tf_Y) | |
self.squared_errors = tf.square(errs) | |
self.rmse = tf.sqrt(tf.reduce_mean(self.squared_errors)) | |
# Add weight decay regularization term to loss | |
weight_reg = sum([tf.nn.l2_loss(w) for w in self.weights]) | |
self.loss = self.rmse + self.params['weight_penalty'] * weight_reg | |
def _configure_backprop(self): | |
"""Configure backprop for this network. | |
Sets our gradients & optimizer given our loss and training parameters. | |
If `params['clip_gradients']` is true, we clip by a global norm. We set | |
clip_norm to be 5. | |
Once this is called, we can call self.opt_step to run an optimization | |
step. | |
:returns: None | |
:rtype: NoneType | |
""" | |
# Set up backpropagation computation! | |
self.global_step = tf.Variable(0, trainable=False, name='global_step') | |
self.train_params = tf.trainable_variables() | |
# set gradients for learning | |
self.gradients = tf.gradients(self.loss, self.train_params) | |
if self.params['clip_gradients']: | |
self.gradients, _ = tf.clip_by_global_norm(self.gradients, 5) | |
# optimization step using gradients | |
self.tf_optimizer = self.optimizer(self.params['learning_rate']) | |
self.opt_step = self.tf_optimizer.apply_gradients(zip(self.gradients, | |
self.train_params), | |
self.global_step) | |
def configure_network(self): | |
"""Configure our network based on our model type. | |
Configures the network's loss, weight regularization, prediction & | |
accuracy, etc. depending on our model type. Also configures | |
backpropogation & optimization. | |
self.opt_step may be run after this is called to run optimization. | |
:returns: None | |
:rtype: NoneType | |
""" | |
if self.params['model_type'] == 'classification': | |
self._configure_classification() | |
else: | |
self._configure_regression() | |
self._configure_backprop() | |
def build_graph(self): | |
"""Constructs the tensorflow graph with all variables to be trained | |
Configures network, initializes weights, sets loss & regularization | |
based on model type, runs the network once, and initializes tensorflow | |
variables. | |
:returns: None | |
:rtype: NoneType | |
""" | |
print("building graph...") | |
# get context of our one and only graph | |
with self.graph.as_default() as g: | |
self._configure_common() | |
self.initialize_weights() | |
self.logits = self._run_network(self.tf_X) | |
self.configure_network() | |
# Necessary for tensorflow to build graph | |
self.init = tf.global_variables_initializer() | |
########################################################################## | |
# Training | |
########################################################################## | |
def _print_validation_results(self, step, train_score, val_score): | |
"""Prints the validation results for this step, and saves a model checkpoint in | |
checkpoint_dir. | |
:param step: the current training step number we're on | |
:param train_score: the model's current score on the training data | |
:param val_score: the model's current score on the validation data | |
:returns: None | |
:rtype: NoneType | |
""" | |
print "Training iteration", step | |
print "\t Training", self.metric_name, train_score | |
print "\t Validation", self.metric_name, val_score | |
self.train_metrics.append(train_score) | |
self.val_metrics.append(val_score) | |
# Save a checkpoint of the model | |
self.saver.save(self.session, self.checkpoint_dir + | |
self.model_name + '.ckpt', global_step=step) | |
""" | |
Runs validation on model trained up to this step. | |
step: step number | |
feed_dict: feed dict given to this session to run | |
Returns the train score and validation score of model up to this step. | |
""" | |
def _validate_batch(self, step, feed_dict): | |
"""Runs the model trained up to this step on our validation set. | |
:param step: step number we're currently on | |
:param feed_dict: feed dict given to this session to run | |
:returns: A tuple of the (train_score, val_score) with training | |
and validation score for this step. | |
:rtype: tuple | |
""" | |
val_X, val_Y = self.data_loader.get_val_data() | |
val_feed_dict = {self.tf_X: val_X, | |
self.tf_Y: val_Y, | |
self.tf_dropout_prob: 1.0} | |
eval_fn = self.accuracy if self.is_classification_p else self.rmse | |
train_score = self.session.run(eval_fn, feed_dict) | |
val_score = self.session.run(eval_fn, val_feed_dict) | |
return (train_score, val_score) | |
def _sgd_train_step(self, step, output_every_nth): | |
"""Runs a step of Stochastic Gradient Descent | |
:param step: The number of the step we're on in training | |
:param output_every_nth: The network will print intermediate results | |
and save a checkpoint every `output_every_nth` steps. | |
:returns: None | |
:rtype: NoneType | |
""" | |
# replace placeholders with values from data | |
X, Y = self.data_loader.get_train_batch(self.params['batch_size']) | |
feed_dict = {self.tf_X: X, | |
self.tf_Y: Y, | |
self.tf_dropout_prob: self.params['dropout_prob']} | |
# run an optimization step | |
_ = self.session.run([self.opt_step], feed_dict) | |
# run our validation if we're at our step count | |
if step % output_every_nth == 0: | |
self._validate_batch(step, feed_dict) | |
def train(self, num_steps=30000, output_every_nth=None): | |
"""Trains the network by running Stochastic Gradient Descent for num_steps. | |
:param num_steps: Number of steps to run SGD for | |
:param output_every_nth: The network will print intermediate results | |
and save a checkpoint every `output_every_nth` steps. | |
:returns: None | |
:rtype: NoneType | |
""" | |
if output_every_nth is not None: | |
self.output_every_nth = output_every_nth | |
for step in range(num_steps): | |
self._sgd_train_step(step, output_every_nth) | |
########################################################################## | |
# Prediction & Usage | |
########################################################################## | |
def predict(self, X, get_probabilities=False): | |
"""Runs the network to get predictions for new data X. | |
:param X: matrix of data in the same shape + format as the data this | |
network was trained on. | |
:param get_probabilities: If true, the network will return the model's | |
computed softmax probabilities as well as its predictions. Only works | |
for classification. | |
:returns: Integer class predictions if classification, and float | |
predictions if regression. | |
:rtype: tf.int64 or tf.float64 | |
""" | |
# no dropout for prediction | |
feed_dict = {self.tf_X: X, | |
self.tf_dropout_prob: 1.0} | |
if self.is_classification_p: | |
probs, preds = self.session.run([self.class_probabilities, | |
self.predictions], | |
feed_dict) | |
return (preds, probs) if get_probabilities else preds | |
else: # regression | |
return self.session.run(self.logits, feed_dict) | |
def plot_training_progress(self): | |
"""Plots the training and validation performance as evaluated | |
throughout training.""" | |
x = [self.output_every_nth * i for i in np.arange(len(self.train_metrics))] | |
plt.figure() | |
plt.plot(x,self.train_metrics) | |
plt.plot(x,self.val_metrics) | |
plt.legend(['Train', 'Validation'], loc='best') | |
plt.xlabel('Training epoch') | |
plt.ylabel(self.metric_name) | |
plt.show() | |
def plot_binary_classification_data(self, with_decision_boundary=False): | |
"""Plots the data from each of two binary classes with two different | |
colours. If with_decision_boundary is set to true, also plots the | |
decision boundary learned by the model. | |
Note: This function only works if there are two input features. | |
""" | |
class1_X, class2_X = self.data_loader.get_train_binary_classification_data() | |
plt.figure() | |
plt.scatter(class1_X[:,0],class1_X[:,1], color='b') | |
plt.scatter(class2_X[:,0],class2_X[:,1], color='r') | |
if with_decision_boundary: | |
# Make a mesh of points on which to make predictions | |
mesh_step_size = .1 | |
x1_min = self.data_loader.train_X[:, 0].min() - 1 | |
x1_max = self.data_loader.train_X[:, 0].max() + 1 | |
x2_min = self.data_loader.train_X[:, 1].min() - 1 | |
x2_max = self.data_loader.train_X[:, 1].max() + 1 | |
xx1, xx2 = np.meshgrid(np.arange(x1_min, x1_max, mesh_step_size), | |
np.arange(x2_min, x2_max, mesh_step_size)) | |
# Make predictions for each point in the mesh | |
Z = self.predict(np.c_[xx1.ravel(), xx2.ravel()]) | |
# Use matplotlib contour function to show decision boundary on mesh | |
Z = Z.reshape(xx1.shape) | |
plt.contour(xx1, xx2, Z, cmap=plt.cm.Paired) | |
plt.show() | |
def plot_regression_data(self, with_decision_boundary=False): | |
"""Plots input regression data. If with_decision_boundary is set | |
to true, also plots the regression function learned by the model. | |
Note: This function only works if there is one input feature. | |
""" | |
plt.figure() | |
plt.scatter(self.data_loader.train_X, self.data_loader.train_Y) | |
if with_decision_boundary: | |
sorted_x = sorted(self.data_loader.train_X) | |
preds = self.predict(sorted_x) | |
plt.plot(sorted_x, preds, color='r', lw=2) | |
plt.show() | |
def test_on_validation(self): | |
"""Returns performance on the model's validation set.""" | |
score = self.get_performance_on_data(self.data_loader.val_X, | |
self.data_loader.val_Y) | |
print "Final", self.metric_name, "on validation data is:", score | |
return score | |
def test_on_test(self): | |
"""Returns performance on the model's test set.""" | |
print "WARNING! Only test on the test set when you have finished choosing all of your hyperparameters!" | |
print "\tNever use the test set to choose hyperparameters!!!" | |
score = self.get_performance_on_data(self.data_loader.test_X, | |
self.data_loader.test_Y) | |
print "Final", self.metric_name, "on test data is:", score | |
return score | |
def get_performance_on_data(self, X, Y): | |
"""Returns the model's performance on input data X and targets Y.""" | |
feed_dict = {self.tf_X: X, | |
self.tf_Y: Y, | |
self.tf_dropout_prob: 1.0} # no dropout during evaluation | |
if self.params['model_type'] == 'classification': | |
score = self.session.run(self.accuracy, feed_dict) | |
else: # regression | |
score = self.session.run(self.rmse, feed_dict) | |
return score |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment