Skip to content

Instantly share code, notes, and snippets.

@dawu76
Last active January 4, 2018 21:16
Show Gist options
  • Save dawu76/eae1f9f36b77aaf16b465296877cff40 to your computer and use it in GitHub Desktop.
Save dawu76/eae1f9f36b77aaf16b465296877cff40 to your computer and use it in GitHub Desktop.
non-working model.py for passing instance keys in a `gcloud ml-engine local predict` call with TF runtime version=1.4
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import OrderedDict
import multiprocessing
import numpy as np
import six
import tensorflow as tf
COLUMNS_TO_DEFAULTS = OrderedDict([
('user_id', tf.constant([], dtype=tf.int64)),
('label', tf.constant([], dtype=tf.int32)),
('active', tf.constant([], dtype=tf.int32)),
('total_bookings', tf.constant([], dtype=tf.int32)),
('flight_bookings', tf.constant([], dtype=tf.int32)),
('hotel_bookings', tf.constant([], dtype=tf.int32)),
('other_bookings', tf.constant([], dtype=tf.int32)),
# ...
])
LABEL_COLUMN = 'label'
INSTANCE_KEY_COLUMN = 'user_id'
CSV_COLUMNS = COLUMNS_TO_DEFAULTS.keys()
CSV_COLUMN_DEFAULTS = COLUMNS_TO_DEFAULTS.values()
INPUT_COLUMNS = [
# instance key meant to be passed through
tf.feature_column.numeric_column(INSTANCE_KEY_COLUMN, dtype=tf.int64),
# numeric features to be modeled as-sis
tf.feature_column.numeric_column('flight_bookings', dtype=tf.int16),
tf.feature_column.numeric_column('hotel_bookings', dtype=tf.int16),
tf.feature_column.numeric_column('other_bookings', dtype=tf.int16),
# ...
# numeric features to be bucketized
tf.feature_column.numeric_column('avg_booked_airfare'),
# ...
# categorical features with pre-defined allowable set of values
# * note: in a prior failed attempt, I defined the features below as numerics
# and then applied a `categorical_column_with_identity` transform downstream
# because we need to provide the dtype of each feature in INPUT_COLUMNS to the
# JSON serving function called in the evaluation step. However, this kept
# resulting in a bizarre ValueError: "'_NumericColumn(key=...)_indicator'
# is not a valid scope name" message.
# * as a result, I'm directly defining these features as categoricals via
# `categorical_column_with_vocabulary_list()`, which allows us to explicitly
# specify a dtype for them, as required by the custom JSON serving function.
# Downstream, these categorical features are converted to indicator features
# prior to getting passed into the DNN estimator.
# ...
tf.feature_column.categorical_column_with_vocabulary_list(
key='is_us_traveler', vocabulary_list=[0, 1],
default_value=0, dtype=tf.int8),
# hashed categorical for higher-cardinality categorical
tf.feature_column.categorical_column_with_hash_bucket(
'home_region', hash_bucket_size=25, dtype=tf.string)
]
UNUSED_COLUMNS = set(CSV_COLUMNS) - \
{col.name for col in INPUT_COLUMNS} - {LABEL_COLUMN}
def build_model_columns(embedding_size=4):
"""
Return tuple of feature column lists in the following order:
- continuous features
- indicator features based on bucketized numeric features
- indicator features based on categorical features
"""
# assign input columns to features
(
# instance key to be passed through
user_id,
# numeric features to be modeled as-is
flight_bookings,
hotel_bookings,
other_bookings,
# ...
# numeric features to be bucketized
avg_booked_airfare,
# ...
# categorical features
is_us_traveler,
home_region
# ...
) = INPUT_COLUMNS
##########################################
# derived features: based on inputs above
##########################################
# derived features: bucketized versions of numeric features
avg_booked_airfare_b = tf.feature_column.bucketized_column(
avg_booked_airfare,
boundaries=[1, 100, 500, 1000, 2000])
# ...
# derived features: indicator variables based on categorical features
# ...
is_us_traveler_ind = tf.feature_column.indicator_column(is_us_traveler)
# derived features: embedding based on hashed categorical
home_region_emb = \
tf.feature_column.embedding_column(home_region, dimension=embedding_size)
# define sets of feature columns
FEAT_CONTINUOUS = [
flight_bookings,
hotel_bookings,
other_bookings,
# ...
]
# bucketized numeric features for linear model
FEAT_BUCKET = [
avg_booked_airfare_b,
# ...
]
# indicators on bucketized numeric features for DNN
FEAT_BUCKET_IND = [
tf.feature_column.indicator_column(feat) for feat in FEAT_BUCKET
]
FEAT_CAT_IND = [
# ...
is_us_traveler_ind,
home_region_emb
]
return FEAT_CONTINUOUS, FEAT_BUCKET_IND, FEAT_CAT_IND
def key_model_fn_gen(estimator, instance_key='user_id'):
'''
A function that takes a specified estimator and returns a function
that in turn returns an EstimatorSpec. The 'predictions' item in the
EstimatorSpec
Intended to be passed as an arg to the `model_fn` arg in a
`tf.estimator.Estimator(model_fn=...)` call.
'''
def _model_fn(features, labels, mode):
instance_key_feature = features.pop(instance_key, None)
# Note: there was an API change between TF 1.2 and 1.3/1.4, so we can't use
# `estimator.model_fn` or `estimator._model_fn` any longer. In contrast to
# `_call_model_fn`, the `_model_fn` and `model_fn` functions simply return
# functions, not the desired EstimatorSpecs. We want the function passed in
# to the `model_fn` arg to return an EstimatorSpec.
# The `_call_model_fn()` function returns an EstimatorSpec object, which fully
# defines the model to be run by an Estimator. It consists of the ops and objects
# returned from a `model_fn` and passed to an `Estimator`.
# https://www.tensorflow.org/api_docs/python/tf/estimator/EstimatorSpec:
# The `model_fn` can populate all arguments to EsitmatorSpec constructor
# independent of mode. In this case, some arguments will be ignored by the
# Estimator. Alternately `model_fn` can just populate the arguments appropriate
# to the given mode via various `mode == tf.estimator.ModeKeys.TRAIN` ...
# conditions.
# Selection of args to EstimatorSpec constructor:
# - mode: alias for field number 0. One of `tf.estimator.ModeKeys`.
# - predictions: alias for field number 1
# - loss: alias for field number 2
# - eval_metric_ops: alias for field number 4. A dict of desired metrics
# keyed by name.
# - export_outputs: A dict {name: output} describing the output signatures
# to be exported to SavedModel and used during serving. Each 'name' is
# an arbitrary name for this output, and each 'output' (value) is an
# ExportOutput object such as ClassificationOutput, RegressionOutput,
# or PredictOutput. Must be defined only when mode == 'infer' or 'eval'.
estimatorSpec = estimator._call_model_fn(
features=features, labels=labels, mode=mode, config=estimator.config)
# report additional metrics by adding them to EstimatorSpec dict
if mode == tf.estimator.ModeKeys.EVAL:
# evaluate precision & recall at various prob thresholds
prob_thresholds = np.linspace(0.05, 0.50, 10).tolist()
# Tensor of shape `TensorShape([Dimension(...), Dimension(2)])`
probs_pred = estimatorSpec.predictions['probabilities']
# get slice of shape `TensorShape([Dimension(...), Dimension(1)])`
# containing each user's prob of 'conversion', however defined
probs_pred_1 = tf.slice(probs_pred, [0, 1], [tf.shape(probs_pred)[0], 1])
# precision & recall based on various thresholds for classification
estimatorSpec.eval_metric_ops['precision_values'] = \
tf.metrics.precision_at_thresholds(labels, probs_pred_1, prob_thresholds)
estimatorSpec.eval_metric_ops['recall_values'] = \
tf.metrics.recall_at_thresholds(labels, probs_pred_1, prob_thresholds)
# store identifying key in `predictions` dict
if instance_key_feature is not None:
estimatorSpec.predictions[instance_key] = instance_key_feature
# return predictions with instance key when predicting & serving
# note: assigning `PredictOutput(estimatorSpec.predictions)` to
# estimatorSpec.export_outputs['serving_default'] enables the instance
# key to appear among the 'serving_default' SignatureDef outputs
# of the saved model
if estimatorSpec.export_outputs:
estimatorSpec.export_outputs['predict'] = \
tf.estimator.export.PredictOutput(estimatorSpec.predictions)
estimatorSpec.export_outputs['serving_default'] = \
tf.estimator.export.PredictOutput(estimatorSpec.predictions)
tf.logging.info('\nEstimatorSpec export_outputs: {}\n'.format(estimatorSpec.export_outputs))
tf.logging.info('\nEstimatorSpec instance: {}\n'.format(estimatorSpec))
tf.logging.info('\nestimatorSpec prediction keys: {}\n'.format(estimatorSpec.predictions.keys()))
tf.logging.info('\nfeatures to include in model: {}\n'.format(features))
return estimatorSpec
return _model_fn
def build_estimator(config, embedding_size=4, hidden_units=None):
feat_continuous, feat_bucket_ind, feat_cat_ind = \
build_model_columns(embedding_size=embedding_size)
deep_columns_w_buckets = feat_continuous + feat_bucket_ind + feat_cat_ind
tf.logging.info('\ncontinuous features: {}\n'.format(feat_continuous))
tf.logging.info('\none-hot-encoded bucketized numeric features: {}\n'.format(feat_bucket_ind))
tf.logging.info('\none-hot-encoded categorical features: {}\n'.format(feat_cat_ind))
# Return an Estimator class to train and evaluate TensorFlow models.
# The `Estimator` object wraps a model which is specified by a `model_fn`,
# which, given inputs and a number of other parameters, returns the ops
# necessary to perform training, evaluation, or predictions.
return tf.estimator.Estimator(
model_fn=key_model_fn_gen(
tf.estimator.DNNClassifier(
config=config,
feature_columns=deep_columns_w_buckets,
hidden_units=hidden_units or [150, 75, 50, 25])
),
config=config
)
# ************************************************************************
# YOU NEED NOT MODIFY ANYTHING BELOW HERE TO ADAPT THIS MODEL TO YOUR DATA
# ************************************************************************
# commenting out bodies of CSV & example-serving functions for clarity since
# they're not used by `gcloud ml-engine local predict --json-instances=...`
def csv_serving_input_fn():
pass
# """Build the serving inputs."""
# csv_row = tf.placeholder(
# shape=[None],
# dtype=tf.string
# )
# features = parse_csv(csv_row)
# features.pop(LABEL_COLUMN)
# return tf.estimator.export.ServingInputReceiver(features, {'csv_row': csv_row})
def example_serving_input_fn():
pass
# """
# Build the serving inputs.
# Changes:
# - added instance key to input.
# - Referenced `feature_scalars` instead of `features`.
# """
# example_bytestring = tf.placeholder(
# shape=[None],
# dtype=tf.string,
# )
# feature_scalars = tf.parse_example(
# example_bytestring,
# tf.feature_column.make_parse_example_spec(INPUT_COLUMNS)
# # tf.feature_column.make_parse_example_spec(INPUT_COLUMNS + INPUT_INSTANCE_KEY)
# )
# # return tf.estimator.export.ServingInputReceiver(
# # features,
# # {'example_proto': example_bytestring}
# # )
# return tf.estimator.export.ServingInputReceiver(
# feature_scalars, {'example_proto': example_bytestring})
# [START serving-function]
def json_serving_input_fn():
"""
Build the serving inputs.
Changes: Added instance key to `inputs` dict.
"""
inputs = {}
features = {}
# instance_key_dict = {}
for feat in INPUT_COLUMNS:
inputs[feat.name] = tf.placeholder(shape=[None], dtype=feat.dtype)
if feat.name != INSTANCE_KEY_COLUMN:
features[feat.name] = tf.placeholder(shape=[None], dtype=feat.dtype)
# note: below I've tried passing multiple variations of arguments to
# `tf.estimator.export.ServingInputReceiver(...)` but they all result in
# errors
serving_input_rcvr = \
tf.estimator.export.ServingInputReceiver(inputs, inputs)
# tf.estimator.export.ServingInputReceiver(inputs, features)
# tf.estimator.export.ServingInputReceiver(inputs, inputs, {'serving_default': inputs})
# tf.estimator.export.ServingInputReceiver(features, inputs, {'serving_default': inputs})
# tf.estimator.export.ServingInputReceiver(features, inputs)
tf.logging.info('JSON serving input receiver: {}'.format(serving_input_rcvr))
return serving_input_rcvr
# [END serving-function]
SERVING_FUNCTIONS = {
'JSON': json_serving_input_fn,
'EXAMPLE': example_serving_input_fn,
'CSV': csv_serving_input_fn
}
def parse_csv(rows_string_tensor):
"""Takes the string input tensor and returns a dict of rank-2 tensors."""
row_columns = tf.expand_dims(rows_string_tensor, -1)
columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
# remove columns present in CSV input but not INPUT_COLUMNS
for col in UNUSED_COLUMNS:
features.pop(col)
labels = features.pop('label')
return features, labels
def input_fn(
filenames,
num_epochs=None,
shuffle=True,
skip_header_lines=0,
batch_size=200):
dataset = tf.data.TextLineDataset(filenames).skip(skip_header_lines).map(parse_csv)
if shuffle:
dataset = dataset.shuffle(buffer_size=batch_size * 10)
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
features, labels = iterator.get_next()
# return features, parse_label_column(features.pop(LABEL_COLUMN))
return features, labels
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment