Created
December 11, 2018 23:38
-
-
Save jcreinhold/f026ecfcd0e8a8b80427707aab182e0c to your computer and use it in GitHub Desktop.
CPU RPCA and SVT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
rpca_cpu | |
implementations of RPCA on the CPU for low-rank and sparse | |
matrix decomposition as well as a nuclear-norm | |
minimization routine via singular value thresholding | |
for matrix completion | |
The RPCA implementation is heavily based on: | |
https://github.com/dganguli/robust-pca | |
The svt implementation was based on: | |
https://github.com/tonyduan/matrix-completion | |
References: | |
[1] Candès, E. J., Li, X., Ma, Y., & Wright, J. (2011). | |
Robust principal component analysis?. Journal of the ACM (JACM), | |
58(3), 11. | |
[2] Cai, J. F., Candès, E. J., & Shen, Z. (2010). | |
A singular value thresholding algorithm for matrix completion. | |
SIAM Journal on Optimization, 20(4), 1956-1982. | |
Author: Jacob Reinhold ([email protected]) | |
""" | |
__all__ = ['RPCA', 'svt'] | |
import numpy as np | |
import scipy | |
class RPCA: | |
""" low-rank and sparse matrix decomposition via RPCA [1] """ | |
def __init__(self, D, mu=None, lmbda=None): | |
self.D = D | |
self.S = np.zeros_like(self.D) | |
self.Y = np.zeros_like(self.D) | |
self.mu = mu or np.prod(self.D.shape) / (4 * self.norm_p(self.D, 2)) | |
self.mu_inv = 1 / self.mu | |
self.lmbda = lmbda or 1 / np.sqrt(np.max(self.D.shape)) | |
@staticmethod | |
def norm_p(M, p): | |
return np.sum(np.power(M, p)) | |
@staticmethod | |
def shrink(M, tau): | |
return np.sign(M) * np.maximum(np.abs(M) - tau, 0) | |
def svd_threshold(self, M, tau): | |
U, s, Vt = scipy.linalg.svd(M, full_matrices=False) | |
return np.dot(U, np.dot(np.diag(self.shrink(s, tau)), Vt)) | |
def fit(self, tol=None, max_iter=1000, iter_print=100): | |
i, err = 0, np.inf | |
Sk, Yk, Lk = self.S, self.Y, np.zeros_like(self.D) | |
_tol = tol or 1e-7 * self.norm_p(np.abs(self.D), 2) | |
while err > _tol and i < max_iter: | |
Lk = self.svd_threshold( | |
self.D - Sk + self.mu_inv * Yk, self.mu_inv) | |
Sk = self.shrink( | |
self.D - Lk + (self.mu_inv * Yk), self.mu_inv * self.lmbda) | |
Yk = Yk + self.mu * (self.D - Lk - Sk) | |
err = self.norm_p(np.abs(self.D - Lk - Sk), 2) / self.norm_p(self.D, 2) | |
i += 1 | |
if (i % iter_print) == 0 or i == 1 or i > max_iter or err <= _tol: | |
print(f'Iteration: {i}; Error: {err:0.4e}') | |
self.L, self.S = Lk, Sk | |
return Lk, Sk | |
def svt(X, mask, tau=None, delta=None, eps=1e-2, max_iter=1000, iter_print=5): | |
""" matrix completion via singular value thresholding [2] """ | |
Z = np.zeros_like(X) | |
tau = tau or 5 * np.sum(X.shape) / 2 | |
delta = delta or 1.2 * np.prod(X.shape) / np.sum(mask) | |
for i in range(max_iter): | |
U, s, Vt = scipy.linalg.svd(Z, full_matrices=False) | |
s = np.maximum(s - tau, 0) | |
A = U @ np.diag(s) @ Vt | |
Z += delta * mask * (X - A) | |
error = np.linalg.norm(mask * (X - A)) / np.linalg.norm(mask * X) | |
if i % iter_print == 0: print(f'Iteration: {i}; Error: {error:.4e}') | |
if error < eps: break | |
return A |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment