Skip to content

Instantly share code, notes, and snippets.

@vascoosx
Created May 30, 2016 08:24
Show Gist options
  • Save vascoosx/10d61000defaae7be7ddf6c72db70cac to your computer and use it in GitHub Desktop.
Save vascoosx/10d61000defaae7be7ddf6c72db70cac to your computer and use it in GitHub Desktop.
calculate all combinations
import scala.collection.mutable
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.rdd.RDD
def blockify(rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
}
}
val itemF = sc.parallelize(Array((1,Array(1.0,2.0,3.0)),(2,Array(2.0,4.0,6.0)),(3,Array(1.0,3.0,9.0))))
val bif = blockify(3, itemF)
val ratings = bif.cartesian(bif).map {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
(srcIds, dstIds, srcFactors.transpose.multiply(dstFactors))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment