Created
January 28, 2011 10:12
-
-
Save GaelVaroquaux/800069 to your computer and use it in GitHub Desktop.
A sparse PCA implementation based on the LARS algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import sys | |
from math import floor, ceil | |
import itertools | |
import numpy as np | |
from scikits.learn.utils.extmath import fast_svd | |
from SparsePCA import _update_U, _update_V, _update_V_parallel, cpu_count | |
################################################################################ | |
# sparsePCA | |
def dict_learning(Y, n_atoms, alpha, n_iter=100, return_code=True, | |
dict_init=None, callback=None, chunk_size=3, verbose=False, | |
shuffle=True, n_jobs=1, coding_method='lars'): | |
""" | |
Online dictionary learning for sparse coding | |
(U^*,V^*) = argmin 0.5 || Y - U V ||_2^2 + alpha * || V ||_1 | |
(U,V) | |
with || U_k ||_2 = 1 for all 0<= k < n_atoms | |
""" | |
t0 = time.time() | |
n_samples, n_features = Y.shape | |
# Avoid integer division problems | |
alpha = float(alpha) | |
if n_jobs == -1: | |
n_jobs = cpu_count() | |
# Init V with SVD of Y | |
if dict_init is not None: | |
V = dict_init | |
else: | |
_, S, V = fast_svd(Y, n_atoms) | |
V = S[:, np.newaxis] * V | |
V = V[:n_atoms, :] | |
V = np.ascontiguousarray(V.T) | |
if verbose == 1: | |
print '[dict_learning]', | |
n_batches = floor(float(len(Y)) / chunk_size) | |
if shuffle: | |
Y_train = Y.copy() | |
np.random.shuffle(Y_train) | |
else: | |
Y_train = Y | |
batches = np.array_split(Y_train, n_batches) | |
batches = itertools.cycle(batches) | |
# The covariance of the dictionary | |
A = np.zeros((n_atoms, n_atoms)) | |
# The data approximation | |
B = np.zeros((n_features, n_atoms)) | |
for ii, this_Y in itertools.izip(xrange(n_iter), batches): | |
#this_Y = this_Y.squeeze() | |
dt = (time.time() - t0) | |
if verbose == 1: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
elif verbose: | |
if verbose > 10 or ii % ceil(100./verbose) == 0: | |
print ("Iteration % 3i (elapsed time: % 3is, % 4.1fmn)" % | |
(ii, dt, dt/60)) | |
this_U = _update_V(V, this_Y.T, alpha) | |
# Update the auxiliary variables | |
if ii < chunk_size - 1: | |
theta = float((ii+1)*chunk_size) | |
else: | |
theta = float(chunk_size**2 + ii + 1 - chunk_size) | |
beta = (theta + 1 - chunk_size)/(theta + 1) | |
A *= beta | |
A += np.dot(this_U, this_U.T) | |
B *= beta | |
B += np.dot(this_Y.T, this_U.T) | |
# Update dictionary | |
V = _update_U(V, B, A, verbose=verbose) | |
# XXX: Can the residuals be of any use? | |
# Maybe we need a stopping criteria based on the amount of | |
# modification in the dictionary | |
if callback is not None: | |
callback(locals()) | |
if return_code: | |
if verbose > 1: | |
print 'Learning code...', | |
elif verbose == 1: | |
print '|', | |
code = _update_V_parallel(V, Y.T, alpha, n_jobs=n_jobs, | |
method=coding_method) | |
if verbose > 1: | |
dt = (time.time() - t0) | |
print 'done (total time: % 3is, % 4.1fmn)' % (dt, dt/60) | |
return V.T, code.T | |
return V.T | |
if __name__ == '__main__': | |
# Generate toy data | |
n_atoms = 3 | |
n_samples = 20 | |
img_sz = (30, 30) | |
n_features = img_sz[0] * img_sz[1] | |
np.random.seed(0) | |
U = np.random.randn(n_samples, n_atoms) | |
V = np.random.randn(n_atoms, n_features) | |
centers = 3*np.array([(3, 3), (6, 7), (8, 1)]) | |
sz = 3*np.array([1, 2, 1]) | |
for k in range(n_atoms): | |
img = np.zeros(img_sz) | |
xmin, xmax = centers[k][0]-sz[k], centers[k][0]+sz[k] | |
ymin, ymax = centers[k][1]-sz[k], centers[k][1]+sz[k] | |
img[xmin:xmax][:,ymin:ymax] = 1.0 | |
V[k,:] = img.ravel() | |
# Y is defined by : Y = UV + noise | |
Y = np.dot(U, V) | |
Y += 0.1 * np.random.randn(Y.shape[0], Y.shape[1]) # Add noise | |
# Estimate U,V | |
alpha = .02 | |
V_estimated, code = dict_learning(Y.T, n_atoms, alpha, | |
n_iter=1000, return_code=True, | |
verbose=2, chunk_size=10, | |
coding_method='cd') | |
# View results | |
import pylab as pl | |
pl.close('all') | |
pl.figure(figsize=(3*n_atoms, 3)) | |
vmax = max(-code.min(), code.max()) | |
for k in range(n_atoms): | |
pl.subplot(1, n_atoms, k+1) | |
pl.imshow(np.ma.masked_equal(np.reshape(code[:, k], img_sz), 0), | |
vmin=-vmax, vmax=vmax, | |
interpolation='nearest') | |
pl.title('Atom %d' % k) | |
pl.show() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import sys | |
import numpy as np | |
from numpy.lib.stride_tricks import as_strided | |
from math import sqrt | |
from scipy import linalg | |
from scikits.learn.linear_model import Lasso, lars_path | |
from joblib import Parallel, delayed | |
################################################################################ | |
# Utilities to spread load on CPUs | |
def _gen_even_slices(n, n_packs): | |
"""Generator to create n_packs slices going up to n. | |
Examples | |
======== | |
>>> list(_gen_even_slices(10, 1)) | |
[slice(0, 10, None)] | |
>>> list(_gen_even_slices(10, 10)) | |
[slice(0, 1, None), slice(1, 2, None), slice(2, 3, None), slice(3, 4, None), slice(4, 5, None), slice(5, 6, None), slice(6, 7, None), slice(7, 8, None), slice(8, 9, None), slice(9, 10, None)] | |
>>> list(_gen_even_slices(10, 5)) | |
[slice(0, 2, None), slice(2, 4, None), slice(4, 6, None), slice(6, 8, None), slice(8, 10, None)] | |
>>> list(_gen_even_slices(10, 3)) | |
[slice(0, 4, None), slice(4, 7, None), slice(7, 10, None)] | |
""" | |
start = 0 | |
for pack_num in range(n_packs): | |
this_n = n//n_packs | |
if pack_num < n % n_packs: | |
this_n += 1 | |
if this_n > 0: | |
end = start + this_n | |
yield slice(start, end, None) | |
start = end | |
def cpu_count(): | |
""" Return the number of CPUs. | |
""" | |
# XXX: should be in joblib | |
try: | |
import multiprocessing | |
except ImportError: | |
return 1 | |
return multiprocessing.cpu_count() | |
################################################################################ | |
# sparsePCA | |
def _update_V(U, Y, alpha, V=None, Gram=None, method='lars'): | |
""" Update V in sparse_pca loop. | |
Parameters | |
=========== | |
V: array, optional | |
Initial value of the dictionary, for warm restart | |
""" | |
n_features = Y.shape[1] | |
n_atoms = U.shape[1] | |
coef = np.empty((n_atoms, n_features)) | |
if method == 'lars': | |
if Gram is None: | |
Gram = np.dot(U.T, U) | |
err_mgt = np.seterr() | |
np.seterr(all='ignore') | |
XY = np.dot(U.T, Y) | |
for k in xrange(n_features): | |
# A huge amount of time is spent in this loop. It needs to be | |
# tight. | |
_, _, coef_path_ = lars_path(U, Y[:, k], Xy=XY[:, k], Gram=Gram, | |
alpha_min=alpha, method='lasso') | |
coef[:, k] = coef_path_[:, -1] | |
np.seterr(**err_mgt) | |
else: | |
clf = Lasso(alpha=alpha, fit_intercept=False) | |
for k in range(n_features): | |
# A huge amount of time is spent in this loop. It needs to be | |
# tight. | |
if V is not None: | |
clf.coef_ = V[:,k] # Init with previous value of Vk | |
clf.fit(U, Y[:,k], max_iter=1000, tol=1e-8) | |
coef[:, k] = clf.coef_ | |
return coef | |
def _update_V_parallel(U, Y, alpha, V=None, Gram=None, method='lars', n_jobs=1): | |
n_samples, n_features = Y.shape | |
if Gram is None: | |
Gram = np.dot(U.T, U) | |
if n_jobs == 1: | |
return _update_V(U, Y, alpha, V=V, Gram=Gram, method=method) | |
n_atoms = U.shape[1] | |
if V is None: | |
V = np.empty((n_atoms, n_features)) | |
slices = list(_gen_even_slices(n_features, n_jobs)) | |
V_views = Parallel(n_jobs=n_jobs)( | |
delayed(_update_V)(U, Y[:, this_slice], V=V[:, this_slice], | |
alpha=alpha, Gram=Gram, method=method) | |
for this_slice in slices) | |
for this_slice, this_view in zip(slices, V_views): | |
V[:, this_slice] = this_view | |
return V | |
def _update_U(U, Y, V, verbose=False, return_r2=False): | |
""" Update U in sparse_pca loop. This function modifies in-place U | |
and V. | |
""" | |
n_atoms = len(V) | |
n_samples = Y.shape[0] | |
R = -np.dot(U, V) # Residuals, computed 'in-place' for efficiency | |
R += Y | |
# Fortran order, as it makes ger faster | |
R = np.asfortranarray(R) | |
ger, = linalg.get_blas_funcs(('ger',), (U, V)) | |
for k in xrange(n_atoms): | |
# R <- 1.0 * U_k * V_k^T + R | |
R = ger(1.0, U[:, k], V[k, :], a=R, overwrite_a=True) | |
U[:, k] = np.dot(R, V[k, :].T) | |
# Scale Uk | |
norm_square_U = np.dot(U[:, k], U[:, k]) | |
if norm_square_U < 1e-20: | |
if verbose == 1: | |
sys.stdout.write("+") | |
sys.stdout.flush() | |
elif verbose: | |
print "Adding new random atom" | |
U[:, k] = np.random.randn(n_samples) | |
# Setting corresponding coefs to 0 | |
V[k, :] = 0.0 | |
U[:, k] /= sqrt(np.dot(U[:, k], U[:, k])) | |
else: | |
U[:, k] /= sqrt(norm_square_U) | |
# R <- -1.0 * U_k * V_k^T + R | |
R = ger(-1.0, U[:, k], V[k, :], a=R, overwrite_a=True) | |
if return_r2: | |
R **= 2 | |
# R is fortran-ordered. For numpy version < 1.6, sum does not | |
# follow the quick striding first, and is thus inefficient on | |
# fortran ordered data. We take a flat view of the data with no | |
# striding | |
R = as_strided(R, shape=(R.size, ), strides=(R.dtype.itemsize,)) | |
R = np.sum(R) | |
#R = np.sum(R, axis=0).sum() | |
return U, R | |
return U | |
def sparse_pca(Y, n_atoms, alpha, maxit=100, tol=1e-8, method='lars', | |
n_jobs=1, U_init=None, V_init=None, callback=None, verbose=False): | |
""" | |
Compute sparse PCA with n_atoms components | |
(U^*,V^*) = argmin 0.5 || Y - U V ||_2^2 + alpha * || V ||_1 | |
(U,V) | |
with || U_k ||_2 = 1 for all 0<= k < n_atoms | |
Notes | |
====== | |
For better efficiency, Y should be fortran-ordered (10 to 20 percent | |
difference in execution time on large data). | |
""" | |
t0 = time.time() | |
# Avoid integer division problems | |
alpha = float(alpha) | |
if n_jobs == -1: | |
n_jobs = cpu_count() | |
n_samples, n_features = Y.shape | |
# Init U and V with SVD of Y | |
# XXX: Should allow passing in only V_init or U_init | |
if U_init is not None and V_init is not None: | |
U = np.array(U_init, order='F') | |
# Don't copy V, it will happen below | |
V = V_init | |
else: | |
U, S, V = linalg.svd(Y, full_matrices=False) | |
V = S[:, np.newaxis] * V | |
U = U[:, :n_atoms] | |
V = V[:n_atoms, :] | |
# Fortran-order V, as we are going to access its row vectors | |
V = np.array(V, order='F') | |
residuals = 0 | |
def cost_function(): | |
return 0.5 * residuals + alpha * np.sum(np.abs(V)) | |
E = [] | |
current_cost = np.nan | |
if verbose == 1: | |
print '[sparse_pca]', | |
for ii in xrange(maxit): | |
dt = (time.time() - t0) | |
if verbose == 1: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
elif verbose: | |
print ("Iteration % 3i " | |
"(elapsed time: % 3is, % 4.1fmn, current cost % 7.3f)" % | |
(ii, dt, dt/60, current_cost)) | |
# Update V | |
V = _update_V_parallel(U, Y, alpha/n_samples, V=V, method='lars', | |
n_jobs=n_jobs) | |
# Update U | |
U, residuals = _update_U(U, Y, V, verbose=verbose, return_r2=True) | |
current_cost = cost_function() | |
E.append(current_cost) | |
if ii > 0: | |
dE = E[-2] - E[-1] | |
assert(dE >= -tol*E[-1] ) | |
if dE < tol*E[-1]: | |
if verbose == 1: | |
# A line return | |
print "" | |
elif verbose: | |
print "--- Convergence reached after %d iterations" % ii | |
break | |
if ii % 5 == 0 and callback is not None: | |
callback(locals()) | |
return U, V, E | |
if __name__ == '__main__': | |
# Generate toy data | |
n_atoms = 3 | |
n_samples = 5 | |
img_sz = (10, 10) | |
n_features = img_sz[0] * img_sz[1] | |
np.random.seed(0) | |
U = np.random.randn(n_samples, n_atoms) | |
V = np.random.randn(n_atoms, n_features) | |
centers = [(3,3),(6,7),(8,1)] | |
sz = [1,2,1] | |
for k in range(n_atoms): | |
img = np.zeros(img_sz) | |
xmin, xmax = centers[k][0]-sz[k], centers[k][0]+sz[k] | |
ymin, ymax = centers[k][1]-sz[k], centers[k][1]+sz[k] | |
img[xmin:xmax][:,ymin:ymax] = 1.0 | |
V[k,:] = img.ravel() | |
# Y is defined by : Y = UV + noise | |
Y = np.dot(U, V) | |
Y += 0.1 * np.random.randn(Y.shape[0], Y.shape[1]) # Add noise | |
# Estimate U,V | |
alpha = 0.5 | |
U_estimated, V_estimated, E = sparse_pca(Y, n_atoms, alpha, maxit=100, | |
method='lars', n_jobs=1, | |
verbose=2) | |
# View results | |
import pylab as pl | |
pl.close('all') | |
pl.figure(figsize=(3*n_atoms, 3)) | |
vmax = max(-V_estimated.min(), V_estimated.max()) | |
for k in range(n_atoms): | |
pl.subplot(1, n_atoms, k+1) | |
pl.imshow(np.ma.masked_equal(np.reshape(V_estimated[k,:], img_sz), 0), | |
vmin=-vmax, vmax=vmax, | |
interpolation='nearest') | |
pl.title('Atom %d' % k) | |
pl.figure() | |
pl.plot(E) | |
pl.xlabel('Iteration') | |
pl.ylabel('Cost function') | |
pl.show() | |
AFAIK in dictionary learning, the dictionary vectors are not sparse but the loadings (a.k.a. the sparse code) are.
Not in my terminology :P. Maybe I am simply confused. Anyhow, it does not matter that much.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think it is a question of transposing the data. We might have used the wrong term.
Actually, I tend to consider that the sparse vector (that I call dictionary elements) are the ones that we want to keep, and I would relearn U (the loadings) given new data. I might even relearn them by least square, ie a projection. That would give me a reduced-dimensionality representation of the data.