Created
June 20, 2017 21:21
-
-
Save ahwillia/a08a954c32310034632ab715032666fb to your computer and use it in GitHub Desktop.
Tensorflow PCA with cross-validation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
from tqdm import tqdm | |
# N, size of matrix. R, rank of data | |
N = 100 | |
R = 5 | |
# generate data | |
W_true = np.random.randn(N,R).astype(np.float32) | |
C_true = np.random.randn(R,N).astype(np.float32) | |
Y_true = np.dot(W_true, C_true) | |
Y_true += np.random.randn(N, N) | |
def run_pca(R, drop_prob = 0.5, niter=5000, quad_reg_scale=1e-4, Optimizer=tf.train.AdamOptimizer): | |
""" | |
Args: | |
R (int) : number of components in the model | |
niter (int) : number of training steps | |
drop_prob (float) : probability to leave-out matrix element for cross-validation | |
quad_reg_scale (float) : scale of quadratic regularization on the low-dimensional factors | |
Optimizer : tensorflow optimizer object (default : AdamOptimizer) | |
Returns: | |
results (dict) : contains train/test error and final estimate of loadings & components | |
Notes: | |
The components and loadings are not orthogonalized or sorted by variance explained | |
""" | |
tf.reset_default_graph() | |
sess = tf.Session() | |
# mask data for cross-validation | |
cv_mask = np.random.rand(N, N) > drop_prob | |
n_training_pts = np.sum(cv_mask) | |
# Loadings (W) and Components (C) | |
W = tf.Variable(np.random.randn(N,R).astype(np.float32)) | |
C = tf.Variable(np.random.randn(R,N).astype(np.float32)) | |
# training error (goes into objective function) | |
Y_est = tf.matmul(W, C) | |
train_resid = tf.constant(cv_mask, dtype=tf.float32) * (tf.constant(Y_true) - Y_est) | |
train_error = tf.reduce_sum(train_resid ** 2) / n_training_pts | |
# test error | |
n_testing_pts = N**2 - n_training_pts | |
test_resid = tf.constant(~cv_mask, dtype=tf.float32) * (tf.constant(Y_true) - Y_est) | |
test_error = tf.reduce_sum(test_resid ** 2) / n_testing_pts | |
# quadratic regularization on loadings / components | |
regularization = quad_reg_scale * (tf.reduce_sum(W**2) + tf.reduce_sum(C**2)) | |
# full objective | |
objective = train_error + regularization | |
# optimization setup | |
train_step = Optimizer(0.001).minimize(objective) | |
sess.run(tf.global_variables_initializer()) | |
# training loop | |
train_history, test_history = [], [] | |
for n in range(niter): | |
_, _train, _test = sess.run([train_step, train_error, test_error]) | |
train_history.append(_train) | |
test_history.append(_test) | |
# final factors | |
W_est, C_est = sess.run([W, C]) | |
sess.close() | |
# collect results | |
results = { | |
'loadings': W, | |
'components': C, | |
'train_history': train_history, | |
'test_history': test_history | |
} | |
return results | |
train, test = [], [] | |
n_components = list(range(1,10)) | |
for R in tqdm(n_components): | |
r = run_pca(R) | |
train.append(r['train_history'][-1]) | |
test.append(r['test_history'][-1]) | |
import matplotlib.pyplot as plt | |
plt.plot(n_components, train, '-b', label='train error') | |
plt.plot(n_components, test, '-r', label='test error') | |
plt.legend() | |
plt.show() |
Author
ahwillia
commented
Jun 20, 2017
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment