Created
July 31, 2018 07:24
-
-
Save seungwonpark/b44b0b75c97f57bc53b7745c32bfa79e to your computer and use it in GitHub Desktop.
Solution of IPSC 2016 Prob No.L
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Solution of IPSC 2016 Prob No.L | |
| # Modified version of official solution at https://ipsc.ksp.sk | |
| # Modified by Seungwon Park (github.com/seungwonpark) | |
| # CC BY-SA 3.0 | |
| # directory structure: | |
| # C:. | |
| # │ train.py | |
| # └─l | |
| # ├─alphabet | |
| # ├─l1 | |
| # └─l2 | |
| import tensorflow as tf # you can use theano or other general-purpose ML library | |
| import numpy as np | |
| TESTCASES = 800 | |
| WIDTH = 100 # width of a single glyph | |
| LABELLED_CNT = 1200 # initially labelled amount of data (1200 = 6 * 200) | |
| TRAIN_PART = 1000 # keep rest for validation | |
| FLAGS = tf.app.flags.FLAGS | |
| tf.app.flags.DEFINE_float('learning_rate', 5e-5, 'learning rate') | |
| tf.app.flags.DEFINE_integer('batch_size', 30, 'size of one batch') | |
| tf.app.flags.DEFINE_boolean('train', False, 'is train mode') | |
| tf.app.flags.DEFINE_boolean('no_autosave', False, 'is autosave') | |
| logs_path = '' | |
| def read_images(): | |
| ret = [] | |
| for file_no in range(1, TESTCASES + 1): | |
| with open("l/l1/%03d.in" % file_no) as ff: | |
| img = np.array([list(map(int, line.strip().split())) for line in ff]) | |
| img = img / 256.0 | |
| img = img - 0.5 # scale & center around 0 | |
| for j in range(6): | |
| ret.append(img[:, j * WIDTH:(j + 1) * WIDTH]) | |
| return ret | |
| def read_solution(): | |
| ret = [] | |
| with open("l/l1/sample.out") as f: | |
| for text in f: | |
| for i in range(6): | |
| ret.append([int(ord(text[i]) == ord('a') + k) for k in range(26)]) | |
| return ret | |
| class ReadModel(object): | |
| def __init__(self): | |
| # input | |
| self._x = tf.placeholder(tf.float32, shape=[None, 70, 100], name="x") | |
| # labelled output | |
| self._y_real = tf.placeholder(tf.float32, shape=[None, 26], name="y_real") | |
| # probability placeholder for dropout layer | |
| self._keep_prob = tf.placeholder(tf.float32, name="keep_prob") | |
| def _make_model(self, x): | |
| # First we have 7x7 convolution with 10 filters | |
| conv1_biases = tf.Variable(tf.zeros([10])) | |
| conv1_weights = tf.Variable(tf.truncated_normal([7, 7, 1, 10], stddev=0.1)) | |
| x = tf.reshape(x, [-1, 70, 100, 1]) | |
| stage1 = tf.nn.conv2d(x, conv1_weights, strides=[1, 1, 1, 1], padding='SAME') | |
| stage1 = tf.tanh(tf.nn.bias_add(stage1, conv1_biases)) | |
| # Then 5x5 downscaling with maxpool layer | |
| pool1 = tf.nn.max_pool(stage1, ksize=[1, 5, 5, 1], strides=[1, 5, 5, 1], padding='SAME') | |
| # Next is 5x5 convolution | |
| conv2_biases = tf.Variable(tf.zeros([10])) | |
| conv2_weights = tf.Variable(tf.truncated_normal([5, 5, 10, 10], stddev=0.1)) | |
| stage2 = tf.nn.conv2d(pool1, conv2_weights, strides=[1, 1, 1, 1], padding='SAME') | |
| stage2 = tf.tanh(tf.nn.bias_add(stage2, conv2_biases)) | |
| # Again maxpool | |
| pool2 = tf.nn.max_pool(stage2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') | |
| # Reshape and do fully-connected hidden layer using matrix multiplication | |
| pool_shape = pool2.get_shape().as_list() | |
| reshape = tf.reshape(pool2, [-1, pool_shape[1] * pool_shape[2] * pool_shape[3]]) | |
| fc1_biases = tf.Variable(tf.constant(0.1, shape=[70])) | |
| fc1_weights = tf.Variable(tf.truncated_normal([700, 70], stddev=0.1)) | |
| hidden = tf.matmul(tf.nn.dropout(reshape, self._keep_prob), fc1_weights) + fc1_biases | |
| hidden = tf.tanh(hidden) | |
| # fully-connected softmax output layer | |
| fc2_biases = tf.Variable(tf.constant(0.1, shape=[26])) | |
| fc2_weights = tf.Variable(tf.truncated_normal([70, 26], stddev=0.1)) | |
| y = tf.nn.softmax(tf.matmul(hidden, fc2_weights) + fc2_biases) | |
| return y | |
| def train(self): | |
| sess = tf.Session() | |
| y = self._make_model(self._x) | |
| # Cross entropy is a standard ML penalty function | |
| cross_entropy = -tf.reduce_mean(self._y_real * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) | |
| correct_prediction = tf.equal(tf.argmax(self._y_real, 1), tf.argmax(y, 1)) | |
| accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) | |
| learning_rate = tf.placeholder(tf.float32) | |
| train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy) | |
| sess.run(tf.global_variables_initializer()) | |
| saver = tf.train.Saver() | |
| with sess.as_default(): | |
| restored = False | |
| if tf.train.checkpoint_exists('model.dat'): | |
| tf.logging.info("Loading model...") | |
| saver.restore(sess, "./model.dat") | |
| restored = True | |
| tf.logging.info("Loading data...") | |
| data, res = read_images(), read_solution() | |
| if FLAGS.train: | |
| tf.logging.info("Processing data...") | |
| example, res = data[:LABELLED_CNT], res[:LABELLED_CNT] | |
| train = example[:TRAIN_PART] | |
| train_res = res[:TRAIN_PART] | |
| test = example[TRAIN_PART:] | |
| test_res = res[TRAIN_PART:] | |
| def next_batch(count): | |
| # Just random sample. not very efficient but who cares... | |
| while True: | |
| ind = np.random.choice(len(train), count) | |
| images = [train[i] for i in ind] | |
| answers = [train_res[i] for i in ind] | |
| return images, answers | |
| # main optimization | |
| tf.logging.info("Training...") | |
| step = 0 | |
| while True: | |
| step += 1 | |
| batch = next_batch(FLAGS.batch_size) | |
| if step % 50 == 0: # print stats | |
| ce = cross_entropy.eval(feed_dict={ | |
| self._x: batch[0], | |
| self._y_real: batch[1], | |
| self._keep_prob: 1.0, | |
| }) | |
| ac = accuracy.eval(feed_dict={ | |
| self._x: test, | |
| self._y_real: test_res, | |
| self._keep_prob: 1.0, | |
| }) | |
| tf.logging.info("step", step) | |
| tf.logging.info("cross-entropy", ce) | |
| tf.logging.info("validation-accuracy %.4lf" % ac) | |
| if not FLAGS.no_autosave: | |
| saver.save(sess, "./model.dat") | |
| # Run training step | |
| train_step.run(feed_dict={ | |
| self._x: batch[0], | |
| self._y_real: batch[1], | |
| learning_rate: FLAGS.learning_rate, | |
| self._keep_prob: 0.5, | |
| }) | |
| else: | |
| if not restored: | |
| raise RuntimeError("you should run with --train first!") | |
| tf.logging.info("Going to evaluate...") | |
| results = y.eval(feed_dict={ | |
| self._x: data, | |
| self._keep_prob: 1.0 | |
| }) | |
| results[0:LABELLED_CNT] = res[0:LABELLED_CNT] # copy sample.out | |
| # map one-hot-encoding to index | |
| results = [np.argmax(res) for res in results] | |
| results = [chr(int(res) + ord("a")) for res in results] | |
| for pos in range(0, len(results), 6): | |
| print("".join(results[pos:pos+6])) | |
| def main(_): | |
| tf.logging.set_verbosity(tf.logging.INFO) | |
| model = ReadModel() | |
| model.train() | |
| if __name__ == '__main__': | |
| tf.app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment