Created
December 11, 2019 07:44
-
-
Save CetinSert/3f41df2d4494683f4acb9590dfeb787f to your computer and use it in GitHub Desktop.
automatic conversion of https://github.com/IBM/tensorflow-hangul-recognition/blob/master/hangul_model.py to tensorflow v2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import io | |
import os | |
import tensorflow as tf | |
from tensorflow.python.tools import freeze_graph | |
from tensorflow.python.tools import optimize_for_inference_lib | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
# Default paths. | |
DEFAULT_LABEL_FILE = os.path.join(SCRIPT_PATH, | |
'./labels/2350-common-hangul.txt') | |
DEFAULT_TFRECORDS_DIR = os.path.join(SCRIPT_PATH, 'tfrecords-output') | |
DEFAULT_OUTPUT_DIR = os.path.join(SCRIPT_PATH, 'saved-model') | |
MODEL_NAME = 'hangul_tensorflow' | |
IMAGE_WIDTH = 64 | |
IMAGE_HEIGHT = 64 | |
DEFAULT_NUM_EPOCHS = 15 | |
BATCH_SIZE = 100 | |
# This will be determined by the number of entries in the given label file. | |
num_classes = 2350 | |
def _parse_function(example): | |
features = tf.io.parse_single_example( | |
serialized=example, | |
features={ | |
'image/class/label': tf.io.FixedLenFeature([], tf.int64), | |
'image/encoded': tf.io.FixedLenFeature([], dtype=tf.string, | |
default_value='') | |
}) | |
label = features['image/class/label'] | |
image_encoded = features['image/encoded'] | |
# Decode the JPEG. | |
image = tf.image.decode_jpeg(image_encoded, channels=1) | |
image = tf.image.convert_image_dtype(image, dtype=tf.float32) | |
image = tf.reshape(image, [IMAGE_WIDTH*IMAGE_HEIGHT]) | |
# Represent the label as a one hot vector. | |
label = tf.stack(tf.one_hot(label, num_classes)) | |
return image, label | |
def export_model(model_output_dir, input_node_names, output_node_name): | |
"""Export the model so we can use it later. | |
This will create two Protocol Buffer files in the model output directory. | |
These files represent a serialized version of our model with all the | |
learned weights and biases. One of the ProtoBuf files is a version | |
optimized for inference-only usage. | |
""" | |
name_base = os.path.join(model_output_dir, MODEL_NAME) | |
frozen_graph_file = os.path.join(model_output_dir, | |
'frozen_' + MODEL_NAME + '.pb') | |
freeze_graph.freeze_graph( | |
name_base + '.pbtxt', None, False, name_base + '.chkp', | |
output_node_name, "save/restore_all", "save/Const:0", | |
frozen_graph_file, True, "" | |
) | |
input_graph_def = tf.compat.v1.GraphDef() | |
with tf.io.gfile.GFile(frozen_graph_file, "rb") as f: | |
input_graph_def.ParseFromString(f.read()) | |
output_graph_def = optimize_for_inference_lib.optimize_for_inference( | |
input_graph_def, input_node_names, [output_node_name], | |
tf.float32.as_datatype_enum) | |
optimized_graph_file = os.path.join(model_output_dir, | |
'optimized_' + MODEL_NAME + '.pb') | |
with tf.io.gfile.GFile(optimized_graph_file, "wb") as f: | |
f.write(output_graph_def.SerializeToString()) | |
print("Inference optimized graph saved at: " + optimized_graph_file) | |
def weight_variable(shape): | |
"""Generates a weight variable of a given shape.""" | |
initial = tf.random.truncated_normal(shape, stddev=0.1) | |
return tf.Variable(initial, name='weight') | |
def bias_variable(shape): | |
"""Generates a bias variable of a given shape.""" | |
initial = tf.constant(0.1, shape=shape) | |
return tf.Variable(initial, name='bias') | |
def main(label_file, tfrecords_dir, model_output_dir, num_train_epochs): | |
"""Perform graph definition and model training. | |
Here we will first create our input pipeline for reading in TFRecords | |
files and producing random batches of images and labels. | |
Next, a convolutional neural network is defined, and training is performed. | |
After training, the model is exported to be used in applications. | |
""" | |
labels = io.open(label_file, 'r', encoding='utf-8').read().splitlines() | |
num_classes = len(labels) | |
# Define names so we can later reference specific nodes for when we use | |
# the model for inference later. | |
input_node_name = 'input' | |
keep_prob_node_name = 'keep_prob' | |
output_node_name = 'output' | |
if not os.path.exists(model_output_dir): | |
os.makedirs(model_output_dir) | |
print('Processing data...') | |
tf_record_pattern = os.path.join(tfrecords_dir, '%s-*' % 'train') | |
train_data_files = tf.io.gfile.glob(tf_record_pattern) | |
tf_record_pattern = os.path.join(tfrecords_dir, '%s-*' % 'test') | |
test_data_files = tf.io.gfile.glob(tf_record_pattern) | |
# Create training dataset input pipeline. | |
train_dataset = tf.data.TFRecordDataset(train_data_files) \ | |
.map(_parse_function) \ | |
.shuffle(1000) \ | |
.repeat(num_train_epochs) \ | |
.batch(BATCH_SIZE) \ | |
.prefetch(1) | |
# Create the model! | |
# Placeholder to feed in image data. | |
x = tf.compat.v1.placeholder(tf.float32, [None, IMAGE_WIDTH*IMAGE_HEIGHT], | |
name=input_node_name) | |
# Placeholder to feed in label data. Labels are represented as one_hot | |
# vectors. | |
y_ = tf.compat.v1.placeholder(tf.float32, [None, num_classes]) | |
# Reshape the image back into two dimensions so we can perform convolution. | |
x_image = tf.reshape(x, [-1, IMAGE_WIDTH, IMAGE_HEIGHT, 1]) | |
# First convolutional layer. 32 feature maps. | |
W_conv1 = weight_variable([5, 5, 1, 32]) | |
b_conv1 = bias_variable([32]) | |
x_conv1 = tf.nn.conv2d(input=x_image, filters=W_conv1, strides=[1, 1, 1, 1], | |
padding='SAME') | |
h_conv1 = tf.nn.relu(x_conv1 + b_conv1) | |
# Max-pooling. | |
h_pool1 = tf.nn.max_pool2d(input=h_conv1, ksize=[1, 2, 2, 1], | |
strides=[1, 2, 2, 1], padding='SAME') | |
# Second convolutional layer. 64 feature maps. | |
W_conv2 = weight_variable([5, 5, 32, 64]) | |
b_conv2 = bias_variable([64]) | |
x_conv2 = tf.nn.conv2d(input=h_pool1, filters=W_conv2, strides=[1, 1, 1, 1], | |
padding='SAME') | |
h_conv2 = tf.nn.relu(x_conv2 + b_conv2) | |
h_pool2 = tf.nn.max_pool2d(input=h_conv2, ksize=[1, 2, 2, 1], | |
strides=[1, 2, 2, 1], padding='SAME') | |
# Third convolutional layer. 128 feature maps. | |
W_conv3 = weight_variable([3, 3, 64, 128]) | |
b_conv3 = bias_variable([128]) | |
x_conv3 = tf.nn.conv2d(input=h_pool2, filters=W_conv3, strides=[1, 1, 1, 1], | |
padding='SAME') | |
h_conv3 = tf.nn.relu(x_conv3 + b_conv3) | |
h_pool3 = tf.nn.max_pool2d(input=h_conv3, ksize=[1, 2, 2, 1], | |
strides=[1, 2, 2, 1], padding='SAME') | |
# Fully connected layer. Here we choose to have 1024 neurons in this layer. | |
h_pool_flat = tf.reshape(h_pool3, [-1, 8*8*128]) | |
W_fc1 = weight_variable([8*8*128, 1024]) | |
b_fc1 = bias_variable([1024]) | |
h_fc1 = tf.nn.relu(tf.matmul(h_pool_flat, W_fc1) + b_fc1) | |
# Dropout layer. This helps fight overfitting. | |
keep_prob = tf.compat.v1.placeholder(tf.float32, name=keep_prob_node_name) | |
h_fc1_drop = tf.nn.dropout(h_fc1, rate=1-keep_prob) | |
# Classification layer. | |
W_fc2 = weight_variable([1024, num_classes]) | |
b_fc2 = bias_variable([num_classes]) | |
y = tf.matmul(h_fc1_drop, W_fc2) + b_fc2 | |
# This isn't used for training, but for when using the saved model. | |
tf.nn.softmax(y, name=output_node_name) | |
# Define our loss. | |
cross_entropy = tf.reduce_mean( | |
input_tensor=tf.nn.softmax_cross_entropy_with_logits( | |
labels=tf.stop_gradient(y_), | |
logits=y | |
) | |
) | |
# Define our optimizer for minimizing our loss. Here we choose a learning | |
# rate of 0.0001 with AdamOptimizer. This utilizes someting | |
# called the Adam algorithm, and utilizes adaptive learning rates and | |
# momentum to get past saddle points. | |
train_step = tf.compat.v1.train.AdamOptimizer(0.0001).minimize(cross_entropy) | |
# Define accuracy. | |
correct_prediction = tf.equal(tf.argmax(input=y, axis=1), tf.argmax(input=y_, axis=1)) | |
correct_prediction = tf.cast(correct_prediction, tf.float32) | |
accuracy = tf.reduce_mean(input_tensor=correct_prediction) | |
saver = tf.compat.v1.train.Saver() | |
with tf.compat.v1.Session() as sess: | |
# Initialize the variables. | |
sess.run(tf.compat.v1.global_variables_initializer()) | |
checkpoint_file = os.path.join(model_output_dir, MODEL_NAME + '.chkp') | |
# Save the graph definition to a file. | |
tf.io.write_graph(sess.graph_def, model_output_dir, | |
MODEL_NAME + '.pbtxt', True) | |
try: | |
iterator = tf.compat.v1.data.make_one_shot_iterator(train_dataset) | |
batch = iterator.get_next() | |
step = 0 | |
while True: | |
# Get a batch of images and their corresponding labels. | |
train_images, train_labels = sess.run(batch) | |
# Perform the training step, feeding in the batches. | |
sess.run(train_step, feed_dict={x: train_images, | |
y_: train_labels, | |
keep_prob: 0.5}) | |
if step % 100 == 0: | |
train_accuracy = sess.run( | |
accuracy, | |
feed_dict={x: train_images, y_: train_labels, | |
keep_prob: 1.0} | |
) | |
print("Step %d, Training Accuracy %g" % | |
(step, float(train_accuracy))) | |
# Every 10,000 iterations, we save a checkpoint of the model. | |
if step % 10000 == 0: | |
saver.save(sess, checkpoint_file, global_step=step) | |
step += 1 | |
except tf.errors.OutOfRangeError: | |
pass | |
# Save a checkpoint after training has completed. | |
saver.save(sess, checkpoint_file) | |
# See how model did by running the testing set through the model. | |
print('Testing model...') | |
# Create testing dataset input pipeline. | |
test_dataset = tf.data.TFRecordDataset(test_data_files) \ | |
.map(_parse_function) \ | |
.batch(BATCH_SIZE) \ | |
.prefetch(1) | |
# Define a different tensor operation for summing the correct | |
# predictions. | |
accuracy2 = tf.reduce_sum(input_tensor=correct_prediction) | |
total_correct_preds = 0 | |
total_preds = 0 | |
try: | |
iterator = tf.compat.v1.data.make_one_shot_iterator(test_dataset) | |
batch = iterator.get_next() | |
while True: | |
test_images, test_labels = sess.run(batch) | |
acc = sess.run(accuracy2, feed_dict={x: test_images, | |
y_: test_labels, | |
keep_prob: 1.0}) | |
total_preds += len(test_images) | |
total_correct_preds += acc | |
except tf.errors.OutOfRangeError: | |
pass | |
test_accuracy = total_correct_preds/total_preds | |
print("Testing Accuracy {}".format(test_accuracy)) | |
export_model(model_output_dir, [input_node_name, keep_prob_node_name], | |
output_node_name) | |
sess.close() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--label-file', type=str, dest='label_file', | |
default=DEFAULT_LABEL_FILE, | |
help='File containing newline delimited labels.') | |
parser.add_argument('--tfrecords-dir', type=str, dest='tfrecords_dir', | |
default=DEFAULT_TFRECORDS_DIR, | |
help='Directory of TFRecords files.') | |
parser.add_argument('--output-dir', type=str, dest='output_dir', | |
default=DEFAULT_OUTPUT_DIR, | |
help='Output directory to store saved model files.') | |
parser.add_argument('--num-train-epochs', type=int, | |
dest='num_train_epochs', | |
default=DEFAULT_NUM_EPOCHS, | |
help='Number of times to iterate over all of the ' | |
'training data.') | |
args = parser.parse_args() | |
main(args.label_file, args.tfrecords_dir, | |
args.output_dir, args.num_train_epochs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment