Last active
December 20, 2015 22:08
-
-
Save rjhall/6202225 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.math3.linear._ | |
import com.twitter.algebird.Operators._ | |
import com.twitter.scalding._ | |
import cascading.pipe.Pipe | |
import cascading.pipe.joiner.InnerJoin | |
import cascading.tuple.Fields | |
object SVD extends Serializable { | |
import com.twitter.scalding.Dsl._ | |
// based on http://amath.colorado.edu/faculty/martinss/Pubs/2012_halko_dissertation.pdf | |
// page 121. | |
// I just computed an intermediate SVD for Y rather than QR to get the basis, since I knew | |
// how to do this without a ton of messing around. | |
// input: | |
// X :x a sparse (binary) matrix in the form ('row, 'col), for now they should be Longs. | |
// d : number of principle components / singular values to compute | |
// output: | |
// (U, E, V) with | |
// U : pipe of ('row, 'vec) where vec is a RealVector | |
// E : pipe of 'E which is an Array[Double] of singular values. | |
// V : pipe of ('col, 'vec) | |
def apply(X : Pipe, d : Int) : (Pipe, Pipe, Pipe) = { | |
// Sample the columns, into the thin matrix. | |
val XS = X.groupBy('row){_.toList[Long]('col -> 'list).reducers(500)} | |
.map('list -> 'vec){l : List[Long] => | |
val a = new Array[Double](d+2) | |
l.foreach{i => | |
val r = new scala.util.Random(i) | |
(0 until (d+2)).foreach{j => | |
a(j) += r.nextGaussian | |
} | |
} | |
MatrixUtils.createRealVector(a) | |
} | |
.project('row, 'vec) | |
// Multiply by powers of XX' | |
val XXXS = X | |
.joinWithSmaller('row -> 'row_, XS.rename('row -> 'row_), new InnerJoin(), 500) | |
.groupBy('col){_.reduce('vec -> 'vec){(a : RealVector, b : RealVector) => a.add(b)}.spillThreshold(20000000)} | |
.joinWithSmaller('col -> 'col_, X.rename('col -> 'col_), new InnerJoin(), 500) | |
.groupBy('row){_.reduce('vec -> 'vec2){(a : RealVector, b : RealVector) => a.add(b)}.spillThreshold(20000000)} | |
val XXXXXS = X | |
.joinWithSmaller('row -> 'row_, XXXS.rename('row -> 'row_), new InnerJoin(), 500) | |
.groupBy('col){_.reduce('vec2 -> 'vec2){(a : RealVector, b : RealVector) => a.add(b)}.spillThreshold(20000000)} | |
.joinWithSmaller('col -> 'col_, X.rename('col -> 'col_), new InnerJoin(), 500) | |
.groupBy('row){_.reduce('vec2 -> 'vec2){(a : RealVector, b : RealVector) => a.add(b)}.spillThreshold(20000000)} | |
// concat and find basis. | |
val Y = XS | |
.joinWithSmaller('row -> 'row, XXXS, new InnerJoin(), 500) | |
.map(('vec, 'vec2) -> 'vec){x : (RealVector, RealVector) => x._1.append(x._2)} | |
.project('row, 'vec) | |
.joinWithSmaller('row -> 'row, XXXXXS, new InnerJoin(), 500) | |
.map(('vec, 'vec2) -> 'vec){x : (RealVector, RealVector) => x._1.append(x._2)} | |
.project('row, 'vec) | |
// build a matrix which will render orthonormal the columns of Y | |
val YY = Y.mapTo('vec -> 'mat){x : RealVector => x.outerProduct(x)} | |
.groupAll{_.reduce('mat -> 'mat){(a : RealMatrix, b : RealMatrix) => a.add(b)}} | |
.mapTo('mat -> 'mat){m : RealMatrix => | |
val e = new EigenDecomposition(m) | |
e.getV.multiply(MatrixUtils.createRealDiagonalMatrix(e.getRealEigenvalues.map{v => if(v < 0.00000001) 0.0 else 1.0 / math.sqrt(v)})) | |
} | |
// form thin orthonormal basis X. | |
val Q = Y.crossWithTiny(YY) | |
.map(('vec, 'mat) -> 'vec){x : (RealVector, RealMatrix) => x._2.transpose.operate(x._1)} | |
.project('row, 'vec) | |
val B = X.joinWithSmaller('row -> 'row, Q, new InnerJoin(), 500) | |
.groupBy('col){_.reduce('vec -> 'vec){(a : RealVector, b : RealVector) => a.add(b)}} | |
val EB = B.mapTo('vec -> 'mat){x : RealVector => x.outerProduct(x)} | |
.groupAll{_.reduce('mat -> 'mat){(a : RealMatrix, b : RealMatrix) => a.add(b)}} | |
.mapTo('mat -> ('eigs, 'eigmat, 'orthomat)){m : RealMatrix => | |
val e = new EigenDecomposition(m) | |
(e.getRealEigenvalues, | |
e.getVT, | |
MatrixUtils.createRealDiagonalMatrix(e.getRealEigenvalues.map{v => if(v < 0.00000001) 0.0 else 1.0 / math.sqrt(v)}).multiply(e.getV)) | |
} | |
val E = EB.project('eigs).map('eigs -> 'eigs){x : Array[Double] => (0 until d).map{i => math.sqrt(x(i))}.toArray} | |
val U = Q.crossWithTiny(EB.project('eigmat)) | |
.map(('vec, 'eigmat) -> 'vec){x : (RealVector, RealMatrix) => x._2.operate(x._1).getSubVector(0,d)} | |
.project('row, 'vec) | |
val V = B.crossWithTiny(EB.project('orthomat)) | |
.map(('vec, 'orthomat) -> 'vec){x : (RealVector, RealMatrix) => x._2.transpose.operate(x._1).getSubVector(0,d)} | |
.project('col, 'vec) | |
(U, E, V) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Interesting. I am not versed very well in scalding. what is happening here?:
i can tell that Y is cross-joined with Y'Y but I am not sure i can infer the formulation behind it all. Distributed QR/orthogonalization has always been a pretty thorny point of it all. (There's actually a clearer alternative to distributed QR but if you consider adding power iterations, which is a must for this method because of accuracy concerns, the things turn out not that efficient afterwards at all.)
note that v_i = nextGuassian != "random unit vector" and also looks expensive as coded...
thanks -d