Skip to content

Instantly share code, notes, and snippets.

@bindiego
Last active August 22, 2019 06:15
Show Gist options
  • Select an option

  • Save bindiego/de41c721a1524c430311c5a1bd758499 to your computer and use it in GitHub Desktop.

Select an option

Save bindiego/de41c721a1524c430311c5a1bd758499 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import numpy as np
tf.logging.set_verbosity(tf.logging.INFO)
CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')
LABEL_COLUMN = 'fare_amount'
KEY_FEATURE_COLUMN = 'key'
DEFAULTS = [[0.0], ['Sun'], [0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]
# These are the raw input columns, and will be provided for prediction also
INPUT_COLUMNS = [
# Define features
tf.feature_column.categorical_column_with_vocabulary_list('dayofweek', vocabulary_list = ['Sun', 'Mon', 'Tues', 'Wed', 'Thu', 'Fri', 'Sat']),
tf.feature_column.categorical_column_with_identity('hourofday', num_buckets = 24),
# Numeric columns
tf.feature_column.numeric_column('pickuplat'),
tf.feature_column.numeric_column('pickuplon'),
tf.feature_column.numeric_column('dropofflat'),
tf.feature_column.numeric_column('dropofflon'),
tf.feature_column.numeric_column('passengers'),
# Engineered features that are created in the input_fn
tf.feature_column.numeric_column('latdiff'),
tf.feature_column.numeric_column('londiff'),
tf.feature_column.numeric_column('euclidean')
]
# Build the estimator
def build_estimator(model_dir, nbuckets, hidden_units):
"""
Build an estimator starting from INPUT COLUMNS.
These include feature transformations and synthetic features.
The model is a wide-and-deep model.
"""
# Input columns
(dayofweek, hourofday, plat, plon, dlat, dlon, pcount, latdiff, londiff, euclidean) = INPUT_COLUMNS
# Bucketize the lats & lons
latbuckets = np.linspace(38.0, 42.0, nbuckets).tolist()
lonbuckets = np.linspace(-76.0, -72.0, nbuckets).tolist()
b_plat = tf.feature_column.bucketized_column(plat, latbuckets)
b_dlat = tf.feature_column.bucketized_column(dlat, latbuckets)
b_plon = tf.feature_column.bucketized_column(plon, lonbuckets)
b_dlon = tf.feature_column.bucketized_column(dlon, lonbuckets)
# Feature cross
ploc = tf.feature_column.crossed_column([b_plat, b_plon], nbuckets * nbuckets)
dloc = tf.feature_column.crossed_column([b_dlat, b_dlon], nbuckets * nbuckets)
pd_pair = tf.feature_column.crossed_column([ploc, dloc], nbuckets ** 4 )
day_hr = tf.feature_column.crossed_column([dayofweek, hourofday], 24 * 7)
# Wide columns and deep columns.
wide_columns = [
# Feature crosses
dloc, ploc, pd_pair,
day_hr,
# Sparse columns
dayofweek, hourofday,
# Anything with a linear relationship
pcount
]
deep_columns = [
# Embedding_column to "group" together ...
tf.feature_column.embedding_column(pd_pair, 10),
tf.feature_column.embedding_column(day_hr, 10),
# Numeric columns
plat, plon, dlat, dlon,
latdiff, londiff, euclidean
]
## setting the checkpoint interval to be much lower for this task
run_config = tf.estimator.RunConfig(save_checkpoints_secs = 30,
keep_checkpoint_max = 3)
estimator = tf.estimator.DNNLinearCombinedRegressor(
model_dir = model_dir,
linear_feature_columns = wide_columns,
dnn_feature_columns = deep_columns,
dnn_hidden_units = hidden_units,
config = run_config)
# add extra evaluation metric for hyperparameter tuning
estimator = tf.contrib.estimator.add_metrics(estimator, add_eval_metrics)
return estimator
# Create feature engineering function that will be used in the input and serving input functions
def add_engineered(features):
# this is how you can do feature engineering in TensorFlow
lat1 = features['pickuplat']
lat2 = features['dropofflat']
lon1 = features['pickuplon']
lon2 = features['dropofflon']
latdiff = (lat1 - lat2)
londiff = (lon1 - lon2)
# set features for distance with sign that indicates direction
features['latdiff'] = latdiff
features['londiff'] = londiff
dist = tf.sqrt(latdiff * latdiff + londiff * londiff)
features['euclidean'] = dist
return features
# Create serving input function to be able to serve predictions
def serving_input_fn():
feature_placeholders = {
# All the real-valued columns
column.name: tf.placeholder(tf.float32, [None]) for column in INPUT_COLUMNS[2:7]
}
feature_placeholders['dayofweek'] = tf.placeholder(tf.string, [None])
feature_placeholders['hourofday'] = tf.placeholder(tf.int32, [None])
features = add_engineered(feature_placeholders.copy())
return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
# Create input function to load data into datasets
def read_dataset(filename, mode, batch_size = 512):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
label = features.pop(LABEL_COLUMN)
return add_engineered(features), label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()
return batch_features, batch_labels
return _input_fn
# Create estimator train and evaluate function
def train_and_evaluate(args):
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
estimator = build_estimator(args['output_dir'], args['nbuckets'], args['hidden_units'].split(' '))
train_spec = tf.estimator.TrainSpec(
input_fn = read_dataset(
filename = args['train_data_paths'],
mode = tf.estimator.ModeKeys.TRAIN,
batch_size = args['train_batch_size']),
max_steps = args['train_steps'])
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
eval_spec = tf.estimator.EvalSpec(
input_fn = read_dataset(
filename = args['eval_data_paths'],
mode = tf.estimator.ModeKeys.EVAL,
batch_size = args['eval_batch_size']),
steps = 100,
exporters = exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
# If we want to use TFRecords instead of CSV
def gzip_reader_fn():
return tf.TFRecordReader(options=tf.python_io.TFRecordOptions(
compression_type = tf.python_io.TFRecordCompressionType.GZIP))
def generate_tfrecord_input_fn(data_paths, num_epochs = None, batch_size = 512, mode = tf.estimator.ModeKeys.TRAIN):
def get_input_features():
# Read the tfrecords. Same input schema as in preprocess
input_schema = {}
if mode != tf.estimator.ModeKeys.INFER:
input_schema[LABEL_COLUMN] = tf.FixedLenFeature(shape = [1], dtype = tf.float32, default_value = 0.0)
for name in ['dayofweek', 'key']:
input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.string, default_value = 'null')
for name in ['hourofday']:
input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.int64, default_value = 0)
for name in SCALE_COLUMNS:
input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.float32, default_value = 0.0)
# How?
keys, features = tf.contrib.learn.io.read_keyed_batch_features(
data_paths[0] if len(data_paths) == 1 else data_paths,
batch_size,
input_schema,
reader = gzip_reader_fn,
reader_num_threads = 4,
queue_capacity = batch_size * 2,
randomize_input = (mode != tf.estimator.ModeKeys.EVAL),
num_epochs = (1 if mode == tf.estimator.ModeKeys.EVAL else num_epochs))
target = features.pop(LABEL_COLUMN)
features[KEY_FEATURE_COLUMN] = keys
return add_engineered(features), target
# Return a function to input the features into the model from a data path.
return get_input_features
def add_eval_metrics(labels, predictions):
pred_values = predictions['predictions']
return {
'rmse': tf.metrics.root_mean_squared_error(labels, pred_values)
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
trainingInput:
scaleTier: STANDARD_1
hyperparameters:
goal: MINIMIZE
maxTrials: 30
maxParallelTrials: 3
hyperparameterMetricTag: rmse
params:
- parameterName: train_batch_size
type: INTEGER
minValue: 64
maxValue: 512
scaleType: UNIT_LOG_SCALE
- parameterName: nbuckets
type: INTEGER
minValue: 10
maxValue: 20
scaleType: UNIT_LINEAR_SCALE
- parameterName: hidden_units
type: CATEGORICAL
categoricalValues: ["128 32", "256 128 16", "64 64 64 8"]
#!/usr/bin/env python
import tensorflow as tf
import numpy as np
import shutil
tf.logging.set_verbosity(tf.logging.INFO)
# List the CSV columns
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
#Choose which column is your label
LABEL_COLUMN = 'fare_amount'
# Set the default values for each CSV column in case there is a missing value
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]
# Create an input function that stores your data into a dataset
def read_dataset(filename, mode, batch_size = 512):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
features = dict(list(zip(CSV_COLUMNS, columns)))
label = features.pop(LABEL_COLUMN)
return features, label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
return dataset.make_one_shot_iterator().get_next()
return _input_fn
# Define your feature columns
INPUT_COLUMNS = [
tf.feature_column.numeric_column('pickuplon'),
tf.feature_column.numeric_column('pickuplat'),
tf.feature_column.numeric_column('dropofflat'),
tf.feature_column.numeric_column('dropofflon'),
tf.feature_column.numeric_column('passengers'),
]
# Create a function that will augment your feature set
def add_more_features(feats):
# Nothing to add (yet!)
return feats
feature_cols = add_more_features(INPUT_COLUMNS)
# Create your serving input function so that your trained model will be able to serve predictions
def serving_input_fn():
feature_placeholders = {
column.name: tf.placeholder(tf.float32, [None]) for column in INPUT_COLUMNS
}
features = feature_placeholders
return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
# Create an estimator that we are going to train and evaluate
def train_and_evaluate(args):
estimator = tf.estimator.DNNRegressor(
model_dir = args['output_dir'],
feature_columns = feature_cols,
hidden_units = args['hidden_units'])
train_spec = tf.estimator.TrainSpec(
input_fn = read_dataset(args['train_data_paths'],
batch_size = args['train_batch_size'],
mode = tf.estimator.ModeKeys.TRAIN),
max_steps = args['train_steps'])
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
eval_spec = tf.estimator.EvalSpec(
input_fn = read_dataset(args['eval_data_paths'],
batch_size = 10000,
mode = tf.estimator.ModeKeys.EVAL),
steps = None,
start_delay_secs = args['eval_delay_secs'],
throttle_secs = args['min_eval_frequency'],
exporters = exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
import argparse
import json
import os
from . import model
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# Input Arguments
parser.add_argument(
'--train_data_paths',
help = 'GCS or local path to training data',
required = True
)
parser.add_argument(
'--train_batch_size',
help = 'Batch size for training steps',
type = int,
default = 512
)
parser.add_argument(
'--train_steps',
help = 'Steps to run the training job for',
type = int
)
parser.add_argument(
'--eval_steps',
help = 'Number of steps to run evalution for at each checkpoint',
default = 10,
type = int
)
parser.add_argument(
'--eval_data_paths',
help = 'GCS or local path to evaluation data',
required = True
)
# Training arguments
parser.add_argument(
'--hidden_units',
help = 'List of hidden layer sizes to use for DNN feature columns',
nargs = '+',
type = int,
default = [128, 32, 4]
)
parser.add_argument(
'--output_dir',
help = 'GCS location to write checkpoints and export models',
required = True
)
parser.add_argument(
'--job-dir',
help = 'this model ignores this field, but it is required by gcloud',
default = 'junk'
)
# Eval arguments
parser.add_argument(
'--eval_delay_secs',
help = 'How long to wait before running first evaluation',
default = 10,
type = int
)
parser.add_argument(
'--min_eval_frequency',
help = 'Seconds between evaluations',
default = 300,
type = int
)
args = parser.parse_args()
arguments = args.__dict__
# Unused args provided by service
arguments.pop('job_dir', None)
arguments.pop('job-dir', None)
output_dir = arguments['output_dir']
# Append trial_id to path if we are doing hptuning
# This code can be removed if you are not using hyperparameter tuning
output_dir = os.path.join(
output_dir,
json.loads(
os.environ.get('TF_CONFIG', '{}')
).get('task', {}).get('trail', '')
)
# Run the training job
model.train_and_evaluate(arguments)
#!/usr/bin/env python
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
print(tf.__version__)
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]
def read_dataset(filename, mode, batch_size = 512):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
label = features.pop(LABEL_COLUMN)
return features, label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
return dataset.make_one_shot_iterator().get_next()
return _input_fn
def get_train():
return read_dataset('./taxi-train*.csv', mode = tf.estimator.ModeKeys.TRAIN)
# return read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN)
def get_valid():
return read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)
def get_test():
return read_dataset('./taxi-test.csv', mode = tf.estimator.ModeKeys.EVAL)
# refactor the way features are created
INPUT_COLUMNS = [
tf.feature_column.numeric_column('pickuplon'),
tf.feature_column.numeric_column('pickuplat'),
tf.feature_column.numeric_column('dropofflat'),
tf.feature_column.numeric_column('dropofflon'),
tf.feature_column.numeric_column('passengers'),
]
def add_more_features(feats):
# Nothing to add (yet!)
return feats
feature_cols = add_more_features(INPUT_COLUMNS)
# create and train the model
tf.logging.set_verbosity(tf.logging.INFO)
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
model = tf.estimator.LinearRegressor(
feature_columns = feature_cols, model_dir = OUTDIR)
model.train(input_fn = get_train(), steps = 100); # TODO: change the name of input_fn as needed
## deep neurlal network
# model = DNNRegressor(feature_columns=[...], hidden_units=[128, 64, 32])
## classification
# model = LinearClassifier(feature_columns=[...])
# model = DNNClassifier(feature_columns=[...], hidden_units=[...])
# evaluate model
def print_rmse(model, name, input_fn):
metrics = model.evaluate(input_fn = input_fn, steps = 1)
print('RMSE on {} dataset = {}'.format(name, np.sqrt(metrics['average_loss'])))
print_rmse(model, 'validation', get_valid())
#!/usr/bin/env python
import tensorflow as tf
import numpy as np
import shutil
import tensorflow as tf
print(tf.__version__)
CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]
def read_dataset(filename, mode, batch_size = 512):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
label = features.pop(LABEL_COLUMN)
return features, label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
return dataset.make_one_shot_iterator().get_next()
return _input_fn
def get_train():
return read_dataset('./taxi-train*.csv', mode = tf.estimator.ModeKeys.TRAIN)
# return read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN)
def get_valid():
return read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL)
def get_test():
return read_dataset('./taxi-test.csv', mode = tf.estimator.ModeKeys.EVAL)
# refactor the way features are created
INPUT_COLUMNS = [
tf.feature_column.numeric_column('pickuplon'),
tf.feature_column.numeric_column('pickuplat'),
tf.feature_column.numeric_column('dropofflat'),
tf.feature_column.numeric_column('dropofflon'),
tf.feature_column.numeric_column('passengers'),
]
def add_more_features(feats):
# Nothing to add (yet!)
return feats
feature_cols = add_more_features(INPUT_COLUMNS)
def serving_input_fn():
feature_placeholders = {
'pickuplon' : tf.placeholder(tf.float32, [None]),
'pickuplat' : tf.placeholder(tf.float32, [None]),
'dropofflat' : tf.placeholder(tf.float32, [None]),
'dropofflon' : tf.placeholder(tf.float32, [None]),
'passengers' : tf.placeholder(tf.float32, [None]),
}
features = {
key: tf.expand_dims(tensor, -1)
for key, tensor in feature_placeholders.items()
}
return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
def train_and_evaluate(output_dir, num_train_steps):
estimator = tf.estimator.LinearRegressor(
model_dir = output_dir,
feature_columns = feature_cols)
train_spec=tf.estimator.TrainSpec(
input_fn = read_dataset('./taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN),
max_steps = num_train_steps)
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
eval_spec=tf.estimator.EvalSpec(
input_fn = read_dataset('./taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL),
steps = None,
start_delay_secs = 1, # start evaluating after N seconds
throttle_secs = 10, # evaluate every N seconds
exporters = exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
# Run training
OUTDIR = 'taxi_trained'
shutil.rmtree(OUTDIR, ignore_errors = True) # start fresh each time
train_and_evaluate(OUTDIR, num_train_steps = 5000)
# Monitoring with TensorBoard
from google.datalab.ml import TensorBoard
TensorBoard().start('./taxi_trained')
TensorBoard().list()
# to stop TensorBoard
for pid in TensorBoard.list()['pid']:
TensorBoard().stop(pid)
print('Stopped TensorBoard with pid {}'.format(pid))
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment