Last active
June 16, 2020 20:23
-
-
Save tuan3w/c72cfe8d998e1763351fed1172755dcc to your computer and use it in GitHub Desktop.
Implementation of Biased Matrix Factorization on Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.spark.mllib.recommendation | |
import org.apache.spark.Logging | |
import org.apache.spark.annotation.{DeveloperApi, Since} | |
import org.apache.spark.api.java.JavaRDD | |
import org.apache.spark.ml.recommendation.{ALS2 => NewALS2} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.storage.StorageLevel | |
/** | |
* Alternating Least Squares matrix factorization. | |
* | |
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, | |
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices. | |
* The general approach is iterative. During each iteration, one of the factor matrices is held | |
* constant, while the other is solved for using least squares. The newly-solved factor matrix is | |
* then held constant while solving for the other factor matrix. | |
* | |
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets | |
* of factors (referred to as "users" and "products") into blocks and reduces communication by only | |
* sending one copy of each user vector to each product block on each iteration, and only for the | |
* product blocks that need that user's feature vector. This is achieved by precomputing some | |
* information about the ratings matrix to determine the "out-links" of each user (which blocks of | |
* products it will contribute to) and "in-link" information for each product (which of the feature | |
* vectors it receives from each user block it will depend on). This allows us to send only an | |
* array of feature vectors between each user block and product block, and have the product block | |
* find the users' ratings and update the products based on these messages. | |
* | |
* For implicit preference data, the algorithm used is based on | |
* "Collaborative Filtering for Implicit Feedback Datasets", available at | |
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here. | |
* | |
* Essentially instead of finding the low-rank approximations to the rating matrix `R`, | |
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if | |
* r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of | |
* indicated user | |
* preferences rather than explicit ratings given to items. | |
*/ | |
@Since("0.8.0") | |
class ALS2 private ( | |
private var numUserBlocks: Int, | |
private var numProductBlocks: Int, | |
private var rank: Int, | |
private var iterations: Int, | |
private var lambda: Double, | |
private var implicitPrefs: Boolean, | |
private var alpha: Double, | |
private var seed: Long = System.nanoTime() | |
) extends Serializable with Logging { | |
/** | |
* Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, | |
* lambda: 0.01, implicitPrefs: false, alpha: 1.0}. | |
*/ | |
@Since("0.8.0") | |
def this() = this(-1, -1, 10, 10, 0.01, false, 1.0) | |
/** If true, do alternating nonnegative least squares. */ | |
private var nonnegative = false | |
/** storage level for user/product in/out links */ | |
private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK | |
private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK | |
/** checkpoint interval */ | |
private var checkpointInterval: Int = 10 | |
/** | |
* Set the number of blocks for both user blocks and product blocks to parallelize the computation | |
* into; pass -1 for an auto-configured number of blocks. Default: -1. | |
*/ | |
@Since("0.8.0") | |
def setBlocks(numBlocks: Int): this.type = { | |
this.numUserBlocks = numBlocks | |
this.numProductBlocks = numBlocks | |
this | |
} | |
/** | |
* Set the number of user blocks to parallelize the computation. | |
*/ | |
@Since("1.1.0") | |
def setUserBlocks(numUserBlocks: Int): this.type = { | |
this.numUserBlocks = numUserBlocks | |
this | |
} | |
/** | |
* Set the number of product blocks to parallelize the computation. | |
*/ | |
@Since("1.1.0") | |
def setProductBlocks(numProductBlocks: Int): this.type = { | |
this.numProductBlocks = numProductBlocks | |
this | |
} | |
/** Set the rank of the feature matrices computed (number of features). Default: 10. */ | |
@Since("0.8.0") | |
def setRank(rank: Int): this.type = { | |
this.rank = rank | |
this | |
} | |
/** Set the number of iterations to run. Default: 10. */ | |
@Since("0.8.0") | |
def setIterations(iterations: Int): this.type = { | |
this.iterations = iterations | |
this | |
} | |
/** Set the regularization parameter, lambda. Default: 0.01. */ | |
@Since("0.8.0") | |
def setLambda(lambda: Double): this.type = { | |
this.lambda = lambda | |
this | |
} | |
/** Sets whether to use implicit preference. Default: false. */ | |
@Since("0.8.1") | |
def setImplicitPrefs(implicitPrefs: Boolean): this.type = { | |
this.implicitPrefs = implicitPrefs | |
this | |
} | |
/** | |
* Sets the constant used in computing confidence in implicit ALS. Default: 1.0. | |
*/ | |
@Since("0.8.1") | |
def setAlpha(alpha: Double): this.type = { | |
this.alpha = alpha | |
this | |
} | |
/** Sets a random seed to have deterministic results. */ | |
@Since("1.0.0") | |
def setSeed(seed: Long): this.type = { | |
this.seed = seed | |
this | |
} | |
/** | |
* Set whether the least-squares problems solved at each iteration should have | |
* nonnegativity constraints. | |
*/ | |
@Since("1.1.0") | |
def setNonnegative(b: Boolean): this.type = { | |
this.nonnegative = b | |
this | |
} | |
/** | |
* :: DeveloperApi :: | |
* Sets storage level for intermediate RDDs (user/product in/out links). The default value is | |
* `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g., `MEMORY_AND_DISK_SER` and | |
* set `spark.rdd.compress` to `true` to reduce the space requirement, at the cost of speed. | |
*/ | |
@DeveloperApi | |
@Since("1.1.0") | |
def setIntermediateRDDStorageLevel(storageLevel: StorageLevel): this.type = { | |
require(storageLevel != StorageLevel.NONE, | |
"ALS is not designed to run without persisting intermediate RDDs.") | |
this.intermediateRDDStorageLevel = storageLevel | |
this | |
} | |
/** | |
* :: DeveloperApi :: | |
* Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default | |
* value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g. | |
* `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement, | |
* at the cost of speed. | |
*/ | |
@DeveloperApi | |
@Since("1.3.0") | |
def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = { | |
this.finalRDDStorageLevel = storageLevel | |
this | |
} | |
/** | |
* Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with | |
* recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps | |
* with eliminating temporary shuffle files on disk, which can be important when there are many | |
* ALS iterations. If the checkpoint directory is not set in [[org.apache.spark.SparkContext]], | |
* this setting is ignored. | |
*/ | |
@DeveloperApi | |
@Since("1.4.0") | |
def setCheckpointInterval(checkpointInterval: Int): this.type = { | |
this.checkpointInterval = checkpointInterval | |
this | |
} | |
/** | |
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. | |
* Returns a MatrixFactorizationModel with feature vectors for each user and product. | |
*/ | |
@Since("0.8.0") | |
def run(ratings: RDD[Rating]): MatrixFactorizationModel = { | |
val sc = ratings.context | |
val numUserBlocks = if (this.numUserBlocks == -1) { | |
math.max(sc.defaultParallelism, ratings.partitions.size / 2) | |
} else { | |
this.numUserBlocks | |
} | |
val numProductBlocks = if (this.numProductBlocks == -1) { | |
math.max(sc.defaultParallelism, ratings.partitions.size / 2) | |
} else { | |
this.numProductBlocks | |
} | |
val (floatUserFactors, floatProdFactors) = NewALS2.train[Int]( | |
ratings = ratings.map(r => NewALS2.Rating(r.user, r.product, r.rating.toFloat)), | |
rank = rank, | |
numUserBlocks = numUserBlocks, | |
numItemBlocks = numProductBlocks, | |
maxIter = iterations, | |
regParam = lambda, | |
implicitPrefs = implicitPrefs, | |
alpha = alpha, | |
nonnegative = nonnegative, | |
intermediateRDDStorageLevel = intermediateRDDStorageLevel, | |
finalRDDStorageLevel = StorageLevel.NONE, | |
checkpointInterval = checkpointInterval, | |
seed = seed) | |
val userFactors = floatUserFactors | |
.mapValues(_.map(_.toDouble)) | |
.setName("users") | |
.persist(finalRDDStorageLevel) | |
val prodFactors = floatProdFactors | |
.mapValues(_.map(_.toDouble)) | |
.setName("products") | |
.persist(finalRDDStorageLevel) | |
if (finalRDDStorageLevel != StorageLevel.NONE) { | |
userFactors.count() | |
prodFactors.count() | |
} | |
new MatrixFactorizationModel(rank, userFactors, prodFactors) | |
} | |
/** | |
* Java-friendly version of [[ALS.run]]. | |
*/ | |
@Since("1.3.0") | |
def run(ratings: JavaRDD[Rating]): MatrixFactorizationModel = run(ratings.rdd) | |
} | |
/** | |
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. | |
*/ | |
@Since("0.8.0") | |
object ALS2 { | |
/** | |
* Train a matrix factorization model given an RDD of ratings given by users to some products, | |
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the | |
* product of two lower-rank matrices of a given rank (number of features). To solve for these | |
* features, we run a given number of iterations of ALS. This is done using a level of | |
* parallelism given by `blocks`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
* @param blocks level of parallelism to split computation into | |
* @param seed random seed | |
*/ | |
@Since("0.9.1") | |
def train( | |
ratings: RDD[Rating], | |
rank: Int, | |
iterations: Int, | |
lambda: Double, | |
blocks: Int, | |
seed: Long | |
): MatrixFactorizationModel = { | |
new ALS2(blocks, blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of ratings given by users to some products, | |
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the | |
* product of two lower-rank matrices of a given rank (number of features). To solve for these | |
* features, we run a given number of iterations of ALS. This is done using a level of | |
* parallelism given by `blocks`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
* @param blocks level of parallelism to split computation into | |
*/ | |
@Since("0.8.0") | |
def train( | |
ratings: RDD[Rating], | |
rank: Int, | |
iterations: Int, | |
lambda: Double, | |
blocks: Int | |
): MatrixFactorizationModel = { | |
new ALS2(blocks, blocks, rank, iterations, lambda, false, 1.0).run(ratings) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of ratings given by users to some products, | |
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the | |
* product of two lower-rank matrices of a given rank (number of features). To solve for these | |
* features, we run a given number of iterations of ALS. The level of parallelism is determined | |
* automatically based on the number of partitions in `ratings`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
*/ | |
@Since("0.8.0") | |
def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double) | |
: MatrixFactorizationModel = { | |
train(ratings, rank, iterations, lambda, -1) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of ratings given by users to some products, | |
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the | |
* product of two lower-rank matrices of a given rank (number of features). To solve for these | |
* features, we run a given number of iterations of ALS. The level of parallelism is determined | |
* automatically based on the number of partitions in `ratings`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
*/ | |
@Since("0.8.0") | |
def train(ratings: RDD[Rating], rank: Int, iterations: Int) | |
: MatrixFactorizationModel = { | |
train(ratings, rank, iterations, 0.01, -1) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users | |
* to some products, in the form of (userID, productID, preference) pairs. We approximate the | |
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features). | |
* To solve for these features, we run a given number of iterations of ALS. This is done using | |
* a level of parallelism given by `blocks`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
* @param blocks level of parallelism to split computation into | |
* @param alpha confidence parameter | |
* @param seed random seed | |
*/ | |
@Since("0.8.1") | |
def trainImplicit( | |
ratings: RDD[Rating], | |
rank: Int, | |
iterations: Int, | |
lambda: Double, | |
blocks: Int, | |
alpha: Double, | |
seed: Long | |
): MatrixFactorizationModel = { | |
new ALS2(blocks, blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users | |
* to some products, in the form of (userID, productID, preference) pairs. We approximate the | |
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features). | |
* To solve for these features, we run a given number of iterations of ALS. This is done using | |
* a level of parallelism given by `blocks`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
* @param blocks level of parallelism to split computation into | |
* @param alpha confidence parameter | |
*/ | |
@Since("0.8.1") | |
def trainImplicit( | |
ratings: RDD[Rating], | |
rank: Int, | |
iterations: Int, | |
lambda: Double, | |
blocks: Int, | |
alpha: Double | |
): MatrixFactorizationModel = { | |
new ALS2(blocks, blocks, rank, iterations, lambda, true, alpha).run(ratings) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users to | |
* some products, in the form of (userID, productID, preference) pairs. We approximate the | |
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features). | |
* To solve for these features, we run a given number of iterations of ALS. The level of | |
* parallelism is determined automatically based on the number of partitions in `ratings`. | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
* @param lambda regularization factor (recommended: 0.01) | |
* @param alpha confidence parameter | |
*/ | |
@Since("0.8.1") | |
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double) | |
: MatrixFactorizationModel = { | |
trainImplicit(ratings, rank, iterations, lambda, -1, alpha) | |
} | |
/** | |
* Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by | |
* users to some products, in the form of (userID, productID, rating) pairs. We approximate the | |
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features). | |
* To solve for these features, we run a given number of iterations of ALS. The level of | |
* parallelism is determined automatically based on the number of partitions in `ratings`. | |
* Model parameters `alpha` and `lambda` are set to reasonable default values | |
* | |
* @param ratings RDD of (userID, productID, rating) pairs | |
* @param rank number of features to use | |
* @param iterations number of iterations of ALS (recommended: 10-20) | |
*/ | |
@Since("0.8.1") | |
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int) | |
: MatrixFactorizationModel = { | |
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.spark.ml.recommendation | |
import java.{ util => ju } | |
import java.io.IOException | |
import scala.collection.mutable | |
import scala.reflect.ClassTag | |
import scala.util.Sorting | |
import scala.util.hashing.byteswap64 | |
import com.github.fommil.netlib.BLAS.{ getInstance => blas } | |
import org.apache.hadoop.fs.{ FileSystem, Path } | |
import org.json4s.DefaultFormats | |
import org.json4s.JsonDSL._ | |
import org.apache.spark.{ Logging, Partitioner } | |
import org.apache.spark.annotation.{ Since, DeveloperApi, Experimental } | |
import org.apache.spark.ml.{ Estimator, Model } | |
import org.apache.spark.ml.param._ | |
import org.apache.spark.ml.param.shared._ | |
import org.apache.spark.ml.util._ | |
import org.apache.spark.mllib.linalg.CholeskyDecomposition | |
import org.apache.spark.mllib.optimization.NNLS | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.types.{ DoubleType, FloatType, IntegerType, StructType } | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.util.Utils | |
import org.apache.spark.util.collection.{ OpenHashMap, OpenHashSet, SortDataFormat, Sorter } | |
import org.apache.spark.util.random.XORShiftRandom | |
/** | |
* Common params for ALS and ALSModel2. | |
*/ | |
private[recommendation] trait ALSModel2Params extends Params with HasPredictionCol { | |
/** | |
* Param for the column name for user ids. | |
* Default: "user" | |
* @group param | |
*/ | |
val userCol = new Param[String](this, "userCol", "column name for user ids") | |
/** @group getParam */ | |
def getUserCol: String = $(userCol) | |
/** | |
* Param for the column name for item ids. | |
* Default: "item" | |
* @group param | |
*/ | |
val itemCol = new Param[String](this, "itemCol", "column name for item ids") | |
/** @group getParam */ | |
def getItemCol: String = $(itemCol) | |
} | |
/** | |
* Common params for ALS. | |
*/ | |
private[recommendation] trait ALSParams extends ALSModel2Params with HasMaxIter with HasRegParam | |
with HasPredictionCol with HasCheckpointInterval with HasSeed { | |
/** | |
* Param for rank of the matrix factorization (>= 1). | |
* Default: 10 | |
* @group param | |
*/ | |
val rank = new IntParam(this, "rank", "rank of the factorization", ParamValidators.gtEq(1)) | |
/** @group getParam */ | |
def getRank: Int = $(rank) | |
/** | |
* Param for number of user blocks (>= 1). | |
* Default: 10 | |
* @group param | |
*/ | |
val numUserBlocks = new IntParam(this, "numUserBlocks", "number of user blocks", | |
ParamValidators.gtEq(1)) | |
/** @group getParam */ | |
def getNumUserBlocks: Int = $(numUserBlocks) | |
/** | |
* Param for number of item blocks (>= 1). | |
* Default: 10 | |
* @group param | |
*/ | |
val numItemBlocks = new IntParam(this, "numItemBlocks", "number of item blocks", | |
ParamValidators.gtEq(1)) | |
/** @group getParam */ | |
def getNumItemBlocks: Int = $(numItemBlocks) | |
/** | |
* Param to decide whether to use implicit preference. | |
* Default: false | |
* @group param | |
*/ | |
val implicitPrefs = new BooleanParam(this, "implicitPrefs", "whether to use implicit preference") | |
/** @group getParam */ | |
def getImplicitPrefs: Boolean = $(implicitPrefs) | |
/** | |
* Param for the alpha parameter in the implicit preference formulation (>= 0). | |
* Default: 1.0 | |
* @group param | |
*/ | |
val alpha = new DoubleParam(this, "alpha", "alpha for implicit preference", | |
ParamValidators.gtEq(0)) | |
/** @group getParam */ | |
def getAlpha: Double = $(alpha) | |
/** | |
* Param for the column name for ratings. | |
* Default: "rating" | |
* @group param | |
*/ | |
val ratingCol = new Param[String](this, "ratingCol", "column name for ratings") | |
/** @group getParam */ | |
def getRatingCol: String = $(ratingCol) | |
/** | |
* Param for whether to apply nonnegativity constraints. | |
* Default: false | |
* @group param | |
*/ | |
val nonnegative = new BooleanParam( | |
this, "nonnegative", "whether to use nonnegative constraint for least squares") | |
/** @group getParam */ | |
def getNonnegative: Boolean = $(nonnegative) | |
setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10, | |
implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item", | |
ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10) | |
/** | |
* Validates and transforms the input schema. | |
* @param schema input schema | |
* @return output schema | |
*/ | |
protected def validateAndTransformSchema(schema: StructType): StructType = { | |
SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) | |
SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) | |
val ratingType = schema($(ratingCol)).dataType | |
require(ratingType == FloatType || ratingType == DoubleType) | |
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) | |
} | |
} | |
/** | |
* :: Experimental :: | |
* Model fitted by ALS. | |
* | |
* @param rank rank of the matrix factorization model | |
* @param userFactors a DataFrame that stores user factors in two columns: `id` and `features` | |
* @param itemFactors a DataFrame that stores item factors in two columns: `id` and `features` | |
*/ | |
@Experimental | |
class ALSModel2 private[ml] ( | |
override val uid: String, | |
val rank: Int, | |
@transient val userFactors: DataFrame, | |
@transient val itemFactors: DataFrame) | |
extends Model[ALSModel2] with ALSModel2Params with MLWritable { | |
/** @group setParam */ | |
def setUserCol(value: String): this.type = set(userCol, value) | |
/** @group setParam */ | |
def setItemCol(value: String): this.type = set(itemCol, value) | |
/** @group setParam */ | |
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |
override def transform(dataset: DataFrame): DataFrame = { | |
// Register a UDF for DataFrame, and then | |
// create a new column named map(predictionCol) by running the predict UDF. | |
val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) => | |
if (userFeatures != null && itemFeatures != null) { | |
blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1) | |
} else { | |
Float.NaN | |
} | |
} | |
dataset | |
.join(userFactors, dataset($(userCol)) === userFactors("id"), "left") | |
.join(itemFactors, dataset($(itemCol)) === itemFactors("id"), "left") | |
.select(dataset("*"), | |
predict(userFactors("features"), itemFactors("features")).as($(predictionCol))) | |
} | |
override def transformSchema(schema: StructType): StructType = { | |
SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) | |
SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) | |
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) | |
} | |
override def copy(extra: ParamMap): ALSModel2 = { | |
val copied = new ALSModel2(uid, rank, userFactors, itemFactors) | |
copyValues(copied, extra).setParent(parent) | |
} | |
@Since("1.6.0") | |
override def write: MLWriter = new ALSModel2.ALSModel2Writer(this) | |
} | |
@Since("1.6.0") | |
object ALSModel2 extends MLReadable[ALSModel2] { | |
@Since("1.6.0") | |
override def read: MLReader[ALSModel2] = new ALSModel2Reader | |
@Since("1.6.0") | |
override def load(path: String): ALSModel2 = super.load(path) | |
private[ALSModel2] class ALSModel2Writer(instance: ALSModel2) extends MLWriter { | |
override protected def saveImpl(path: String): Unit = { | |
val extraMetadata = "rank" -> instance.rank | |
DefaultParamsWriter.saveMetadata(instance, path, sc, Some(extraMetadata)) | |
val userPath = new Path(path, "userFactors").toString | |
instance.userFactors.write.format("parquet").save(userPath) | |
val itemPath = new Path(path, "itemFactors").toString | |
instance.itemFactors.write.format("parquet").save(itemPath) | |
} | |
} | |
private class ALSModel2Reader extends MLReader[ALSModel2] { | |
/** Checked against metadata when loading model */ | |
private val className = classOf[ALSModel2].getName | |
override def load(path: String): ALSModel2 = { | |
val metadata = DefaultParamsReader.loadMetadata(path, sc, className) | |
implicit val format = DefaultFormats | |
val rank = (metadata.metadata \ "rank").extract[Int] | |
val userPath = new Path(path, "userFactors").toString | |
val userFactors = sqlContext.read.format("parquet").load(userPath) | |
val itemPath = new Path(path, "itemFactors").toString | |
val itemFactors = sqlContext.read.format("parquet").load(itemPath) | |
val model = new ALSModel2(metadata.uid, rank, userFactors, itemFactors) | |
DefaultParamsReader.getAndSetParams(model, metadata) | |
model | |
} | |
} | |
} | |
/** | |
* :: Experimental :: | |
* Alternating Least Squares (ALS) matrix factorization. | |
* | |
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, | |
* `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices. | |
* The general approach is iterative. During each iteration, one of the factor matrices is held | |
* constant, while the other is solved for using least squares. The newly-solved factor matrix is | |
* then held constant while solving for the other factor matrix. | |
* | |
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets | |
* of factors (referred to as "users" and "products") into blocks and reduces communication by only | |
* sending one copy of each user vector to each product block on each iteration, and only for the | |
* product blocks that need that user's feature vector. This is achieved by pre-computing some | |
* information about the ratings matrix to determine the "out-links" of each user (which blocks of | |
* products it will contribute to) and "in-link" information for each product (which of the feature | |
* vectors it receives from each user block it will depend on). This allows us to send only an | |
* array of feature vectors between each user block and product block, and have the product block | |
* find the users' ratings and update the products based on these messages. | |
* | |
* For implicit preference data, the algorithm used is based on | |
* "Collaborative Filtering for Implicit Feedback Datasets", available at | |
* [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here. | |
* | |
* Essentially instead of finding the low-rank approximations to the rating matrix `R`, | |
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if | |
* r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of | |
* indicated user | |
* preferences rather than explicit ratings given to items. | |
*/ | |
@Experimental | |
class ALS2(override val uid: String) extends Estimator[ALSModel2] with ALSParams | |
with DefaultParamsWritable { | |
import org.apache.spark.ml.recommendation.ALS2.Rating | |
def this() = this(Identifiable.randomUID("als")) | |
/** @group setParam */ | |
def setRank(value: Int): this.type = set(rank, value) | |
/** @group setParam */ | |
def setNumUserBlocks(value: Int): this.type = set(numUserBlocks, value) | |
/** @group setParam */ | |
def setNumItemBlocks(value: Int): this.type = set(numItemBlocks, value) | |
/** @group setParam */ | |
def setImplicitPrefs(value: Boolean): this.type = set(implicitPrefs, value) | |
/** @group setParam */ | |
def setAlpha(value: Double): this.type = set(alpha, value) | |
/** @group setParam */ | |
def setUserCol(value: String): this.type = set(userCol, value) | |
/** @group setParam */ | |
def setItemCol(value: String): this.type = set(itemCol, value) | |
/** @group setParam */ | |
def setRatingCol(value: String): this.type = set(ratingCol, value) | |
/** @group setParam */ | |
def setPredictionCol(value: String): this.type = set(predictionCol, value) | |
/** @group setParam */ | |
def setMaxIter(value: Int): this.type = set(maxIter, value) | |
/** @group setParam */ | |
def setRegParam(value: Double): this.type = set(regParam, value) | |
/** @group setParam */ | |
def setNonnegative(value: Boolean): this.type = set(nonnegative, value) | |
/** @group setParam */ | |
def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) | |
/** @group setParam */ | |
def setSeed(value: Long): this.type = set(seed, value) | |
/** | |
* Sets both numUserBlocks and numItemBlocks to the specific value. | |
* @group setParam | |
*/ | |
def setNumBlocks(value: Int): this.type = { | |
setNumUserBlocks(value) | |
setNumItemBlocks(value) | |
this | |
} | |
override def fit(dataset: DataFrame): ALSModel2 = { | |
import dataset.sqlContext.implicits._ | |
val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f) | |
val ratings = dataset | |
.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), r) | |
.map { row => | |
Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) | |
} | |
val (userFactors, itemFactors) = ALS2.train(ratings, rank = $(rank), | |
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), | |
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs), | |
alpha = $(alpha), nonnegative = $(nonnegative), | |
checkpointInterval = $(checkpointInterval), seed = $(seed)) | |
val userDF = userFactors.toDF("id", "features") | |
val itemDF = itemFactors.toDF("id", "features") | |
val model = new ALSModel2(uid, $(rank), userDF, itemDF).setParent(this) | |
copyValues(model) | |
} | |
override def transformSchema(schema: StructType): StructType = { | |
validateAndTransformSchema(schema) | |
} | |
override def copy(extra: ParamMap): ALS2 = defaultCopy(extra) | |
} | |
/** | |
* :: DeveloperApi :: | |
* An implementation of ALS that supports generic ID types, specialized for Int and Long. This is | |
* exposed as a developer API for users who do need other ID types. But it is not recommended | |
* because it increases the shuffle size and memory requirement during training. For simplicity, | |
* users and items must have the same type. The number of distinct users/items should be smaller | |
* than 2 billion. | |
*/ | |
@DeveloperApi | |
object ALS2 extends DefaultParamsReadable[ALS2] with Logging { | |
/** | |
* :: DeveloperApi :: | |
* Rating class for better code readability. | |
*/ | |
@DeveloperApi | |
case class Rating[@specialized(Int, Long) ID](user: ID, item: ID, rating: Float) | |
// @Since("1.6.0") | |
// override def load(path: String): ALS2 = super.load(path) | |
/** Trait for least squares solvers applied to the normal equation. */ | |
private[recommendation] trait LeastSquaresNESolver extends Serializable { | |
/** Solves a least squares problem with regularization (possibly with other constraints). */ | |
def solve(ne: NormalEquation, lambda: Double): Array[Float] | |
} | |
/** Cholesky solver for least square problems. */ | |
private[recommendation] class CholeskySolver extends LeastSquaresNESolver { | |
/** | |
* Solves a least squares problem with L2 regularization: | |
* | |
* min norm(A x - b)^2^ + lambda * norm(x)^2^ | |
* | |
* @param ne a [[NormalEquation]] instance that contains AtA, Atb, and n (number of instances) | |
* @param lambda regularization constant | |
* @return the solution x | |
*/ | |
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = { | |
val k = ne.k | |
// Add scaled lambda to the diagonals of AtA. | |
var i = 0 | |
var j = 2 | |
while (i < ne.triK) { | |
ne.ata(i) += lambda | |
i += j | |
j += 1 | |
} | |
CholeskyDecomposition.solve(ne.ata, ne.atb) | |
val x = new Array[Float](k) | |
i = 0 | |
while (i < k) { | |
x(i) = ne.atb(i).toFloat | |
i += 1 | |
} | |
ne.reset() | |
x | |
} | |
} | |
/** NNLS solver. */ | |
private[recommendation] class NNLSSolver extends LeastSquaresNESolver { | |
private var rank: Int = -1 | |
private var workspace: NNLS.Workspace = _ | |
private var ata: Array[Double] = _ | |
private var initialized: Boolean = false | |
private def initialize(rank: Int): Unit = { | |
if (!initialized) { | |
this.rank = rank | |
workspace = NNLS.createWorkspace(rank) | |
ata = new Array[Double](rank * rank) | |
initialized = true | |
} else { | |
require(this.rank == rank) | |
} | |
} | |
/** | |
* Solves a nonnegative least squares problem with L2 regularizatin: | |
* | |
* min_x_ norm(A x - b)^2^ + lambda * n * norm(x)^2^ | |
* subject to x >= 0 | |
*/ | |
override def solve(ne: NormalEquation, lambda: Double): Array[Float] = { | |
val rank = ne.k | |
initialize(rank) | |
fillAtA(ne.ata, lambda) | |
val x = NNLS.solve(ata, ne.atb, workspace) | |
ne.reset() | |
x.map(x => x.toFloat) | |
} | |
/** | |
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square | |
* matrix that it represents, storing it into destMatrix. | |
*/ | |
private def fillAtA(triAtA: Array[Double], lambda: Double) { | |
var i = 0 | |
var pos = 0 | |
var a = 0.0 | |
while (i < rank) { | |
var j = 0 | |
while (j <= i) { | |
a = triAtA(pos) | |
ata(i * rank + j) = a | |
ata(j * rank + i) = a | |
pos += 1 | |
j += 1 | |
} | |
ata(i * rank + i) += lambda | |
i += 1 | |
} | |
} | |
} | |
/** | |
* Representing a normal equation to solve the following weighted least squares problem: | |
* | |
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x. | |
* | |
* Its normal equation is given by | |
* | |
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0. | |
*/ | |
private[recommendation] class NormalEquation(val k: Int) extends Serializable { | |
/** Number of entries in the upper triangular part of a k-by-k matrix. */ | |
val triK = k * (k + 1) / 2 | |
/** A^T^ * A */ | |
val ata = new Array[Double](triK) | |
/** A^T^ * b */ | |
val atb = new Array[Double](k) | |
private val da = new Array[Double](k) | |
private val upper = "U" | |
private def copyToDouble(a: Array[Float]): Unit = { | |
var i = 0 | |
while (i < k) { | |
da(i) = a(i) | |
i += 1 | |
} | |
} | |
/** Adds an observation. */ | |
def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = { | |
require(c >= 0.0) | |
require(a.length == k, "Check: a = " + a.length + ", k = " + k) | |
copyToDouble(a) | |
blas.dspr(upper, k, c, da, 1, ata) | |
if (b != 0.0) { | |
blas.daxpy(k, c * b, da, 1, atb, 1) | |
} | |
this | |
} | |
/** Merges another normal equation object. */ | |
def merge(other: NormalEquation): this.type = { | |
require(other.k == k) | |
blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1) | |
blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1) | |
this | |
} | |
/** Resets everything to zero, which should be called after each solve. */ | |
def reset(): Unit = { | |
ju.Arrays.fill(ata, 0.0) | |
ju.Arrays.fill(atb, 0.0) | |
} | |
} | |
/** | |
* :: DeveloperApi :: | |
* Implementation of the ALS algorithm. | |
*/ | |
@DeveloperApi | |
def train[ID: ClassTag]( // scalastyle:ignore | |
ratings: RDD[Rating[ID]], | |
rank: Int = 10, | |
numUserBlocks: Int = 10, | |
numItemBlocks: Int = 10, | |
maxIter: Int = 10, | |
regParam: Double = 1.0, | |
implicitPrefs: Boolean = false, | |
alpha: Double = 1.0, | |
nonnegative: Boolean = false, | |
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, | |
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, | |
checkpointInterval: Int = 10, | |
seed: Long = 0L)( | |
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { | |
require(intermediateRDDStorageLevel != StorageLevel.NONE, | |
"ALS is not designed to run without persisting intermediate RDDs.") | |
val sc = ratings.sparkContext | |
val userPart = new ALSPartitioner(numUserBlocks) | |
val itemPart = new ALSPartitioner(numItemBlocks) | |
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) | |
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) | |
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver | |
val blockRatings = partitionRatings(ratings, userPart, itemPart) | |
.persist(intermediateRDDStorageLevel) | |
val (userInBlocks, userOutBlocks) = | |
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel) | |
// materialize blockRatings and user blocks | |
userOutBlocks.count() | |
val swappedBlockRatings = blockRatings.map { | |
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => | |
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) | |
} | |
val (itemInBlocks, itemOutBlocks) = | |
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel) | |
// materialize item blocks | |
itemOutBlocks.count() | |
val seedGen = new XORShiftRandom(seed) | |
//add bias | |
var userFactors = initialize(userInBlocks, rank + 1, seedGen.nextLong()) | |
var itemFactors = initialize(itemInBlocks, rank + 1, seedGen.nextLong()) | |
var previousCheckpointFile: Option[String] = None | |
val shouldCheckpoint: Int => Boolean = (iter) => | |
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0) | |
val deletePreviousCheckpointFile: () => Unit = () => | |
previousCheckpointFile.foreach { file => | |
try { | |
FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true) | |
} catch { | |
case e: IOException => | |
logWarning(s"Cannot delete checkpoint file $file:", e) | |
} | |
} | |
if (implicitPrefs) { | |
for (iter <- 1 to maxIter) { | |
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) | |
val previousItemFactors = itemFactors | |
var biases = itemFactors.map { | |
case (blockId, factors) => | |
val b = factors.map { x => x(0) } | |
(blockId, b) | |
} | |
itemFactors = computeFactors(biases, userFactors, userOutBlocks, itemInBlocks, rank, regParam, | |
userLocalIndexEncoder, implicitPrefs, alpha, solver) | |
previousItemFactors.unpersist() | |
itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel) | |
// TODO: Generalize PeriodicGraphCheckpointer and use it here. | |
if (shouldCheckpoint(iter)) { | |
itemFactors.checkpoint() // itemFactors gets materialized in computeFactors. | |
} | |
val previousUserFactors = userFactors | |
biases = userFactors.map { | |
case (blockId, factors) => | |
val b = factors.map(x => x(0)) | |
(blockId, b) | |
} | |
userFactors = computeFactors(biases, itemFactors, itemOutBlocks, userInBlocks, rank, regParam, | |
itemLocalIndexEncoder, implicitPrefs, alpha, solver) | |
if (shouldCheckpoint(iter)) { | |
deletePreviousCheckpointFile() | |
previousCheckpointFile = itemFactors.getCheckpointFile | |
} | |
previousUserFactors.unpersist() | |
} | |
} else { | |
for (iter <- 0 until maxIter) { | |
var biases = itemFactors.map { | |
case (blockId, factors) => | |
val b = factors.map { x => x(0) } | |
(blockId, b) | |
} | |
itemFactors = computeFactors(biases, userFactors, userOutBlocks, itemInBlocks, rank, regParam, | |
userLocalIndexEncoder, solver = solver) | |
if (shouldCheckpoint(iter)) { | |
itemFactors.checkpoint() | |
itemFactors.count() // checkpoint item factors and cut lineage | |
deletePreviousCheckpointFile() | |
previousCheckpointFile = itemFactors.getCheckpointFile | |
} | |
biases = userFactors.map { | |
case (blockId, factors) => | |
val b = factors.map { x => x(0) } | |
(blockId, b) | |
} | |
userFactors = computeFactors(biases, itemFactors, itemOutBlocks, userInBlocks, rank, regParam, | |
itemLocalIndexEncoder, solver = solver) | |
} | |
} | |
val userIdAndFactors = userInBlocks | |
.mapValues(_.srcIds) | |
.join(userFactors) | |
.mapPartitions({ items => | |
items.flatMap { | |
case (_, (ids, factors)) => | |
ids.view.zip(factors) | |
} | |
// Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks | |
// and userFactors. | |
}, preservesPartitioning = true) | |
.map(v => (v._1, v._2.slice(1, v._2.length))) | |
.setName("userFactors") | |
.persist(finalRDDStorageLevel) | |
val itemIdAndFactorsWithBiases = itemInBlocks | |
.mapValues(_.srcIds) | |
.join(itemFactors) | |
.mapPartitions({ items => | |
items.flatMap { | |
case (_, (ids, factors)) => | |
ids.view.zip(factors) | |
} | |
}, preservesPartitioning = true) | |
.map(v => (v._1, (v._2(0), v._2.slice(1, v._2.length)))) | |
.setName("itemFactors") | |
.persist(finalRDDStorageLevel) | |
val biasSamples1 = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._1)) | |
.filter(_._1.asInstanceOf[Int] > 0).take(50) | |
val biasSamples2 = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._1)) | |
.filter(_._1.asInstanceOf[Int] < 0).take(50) | |
println(biasSamples1.mkString("\n------------------------------------\n")) | |
println("=============================================================") | |
println(biasSamples2.mkString("\n------------------------------------\n")) | |
val itemIdAndFactors = itemIdAndFactorsWithBiases.map(x => (x._1, x._2._2)) | |
if (finalRDDStorageLevel != StorageLevel.NONE) { | |
userIdAndFactors.count() | |
itemFactors.unpersist() | |
itemIdAndFactors.count() | |
userInBlocks.unpersist() | |
userOutBlocks.unpersist() | |
itemInBlocks.unpersist() | |
itemOutBlocks.unpersist() | |
blockRatings.unpersist() | |
} | |
(userIdAndFactors, itemIdAndFactors) | |
} | |
/** | |
* Factor block that stores factors (Array[Float]) in an Array. | |
*/ | |
private type FactorBlock = Array[Array[Float]] | |
private type FactorBlock2 = Array[(Array[Float], Float)] | |
/** | |
* Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to | |
* send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the | |
* src factors in this block to send to dst block 0. | |
*/ | |
private type OutBlock = Array[Array[Int]] | |
/** | |
* In-link block for computing src (user/item) factors. This includes the original src IDs | |
* of the elements within this block as well as encoded dst (item/user) indices and corresponding | |
* ratings. The dst indices are in the form of (blockId, localIndex), which are not the original | |
* dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices. | |
* For example, if we have an in-link record | |
* | |
* {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0}, | |
* | |
* and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which | |
* is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3). | |
* | |
* We use a CSC-like (compressed sparse column) format to store the in-link information. So we can | |
* compute src factors one after another using only one normal equation instance. | |
* | |
* @param srcIds src ids (ordered) | |
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and | |
* ratings are associated with srcIds(i). | |
* @param dstEncodedIndices encoded dst indices | |
* @param ratings ratings | |
* | |
* @see [[LocalIndexEncoder]] | |
*/ | |
private[recommendation] case class InBlock[@specialized(Int, Long) ID: ClassTag]( | |
srcIds: Array[ID], | |
dstPtrs: Array[Int], | |
dstEncodedIndices: Array[Int], | |
ratings: Array[Float]) { | |
/** Size of the block. */ | |
def size: Int = ratings.length | |
require(dstEncodedIndices.length == size) | |
require(dstPtrs.length == srcIds.length + 1) | |
} | |
/** | |
* Initializes factors randomly given the in-link blocks. | |
* | |
* @param inBlocks in-link blocks | |
* @param rank rank | |
* @return initialized factor blocks | |
*/ | |
private def initialize[ID]( | |
inBlocks: RDD[(Int, InBlock[ID])], | |
rank: Int, | |
seed: Long): RDD[(Int, FactorBlock)] = { | |
// Choose a unit vector uniformly at random from the unit sphere, but from the | |
// "first quadrant" where all elements are nonnegative. This can be done by choosing | |
// elements distributed as Normal(0,1) and taking the absolute value, and then normalizing. | |
// This appears to create factorizations that have a slightly better reconstruction | |
// (<1%) compared picking elements uniformly at random in [0,1]. | |
inBlocks.map { | |
case (srcBlockId, inBlock) => | |
val random = new XORShiftRandom(byteswap64(seed ^ srcBlockId)) | |
val factors = Array.fill(inBlock.srcIds.length) { | |
val factor = Array.fill(rank)(random.nextGaussian().toFloat) | |
val nrm = blas.snrm2(rank, factor, 1) | |
blas.sscal(rank, 1.0f / nrm, factor, 1) | |
factor(0) = 0.0f //bias = 0 | |
factor | |
} | |
(srcBlockId, factors) | |
} | |
} | |
/** | |
* A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. | |
*/ | |
private[recommendation] case class RatingBlock[@specialized(Int, Long) ID: ClassTag]( | |
srcIds: Array[ID], | |
dstIds: Array[ID], | |
ratings: Array[Float]) { | |
/** Size of the block. */ | |
def size: Int = srcIds.length | |
require(dstIds.length == srcIds.length) | |
require(ratings.length == srcIds.length) | |
} | |
/** | |
* Builder for [[RatingBlock]]. [[mutable.ArrayBuilder]] is used to avoid boxing/unboxing. | |
*/ | |
private[recommendation] class RatingBlockBuilder[@specialized(Int, Long) ID: ClassTag] | |
extends Serializable { | |
private val srcIds = mutable.ArrayBuilder.make[ID] | |
private val dstIds = mutable.ArrayBuilder.make[ID] | |
private val ratings = mutable.ArrayBuilder.make[Float] | |
var size = 0 | |
/** Adds a rating. */ | |
def add(r: Rating[ID]): this.type = { | |
size += 1 | |
srcIds += r.user | |
dstIds += r.item | |
ratings += r.rating | |
this | |
} | |
/** Merges another [[RatingBlockBuilder]]. */ | |
def merge(other: RatingBlock[ID]): this.type = { | |
size += other.srcIds.length | |
srcIds ++= other.srcIds | |
dstIds ++= other.dstIds | |
ratings ++= other.ratings | |
this | |
} | |
/** Builds a [[RatingBlock]]. */ | |
def build(): RatingBlock[ID] = { | |
RatingBlock[ID](srcIds.result(), dstIds.result(), ratings.result()) | |
} | |
} | |
/** | |
* Partitions raw ratings into blocks. | |
* | |
* @param ratings raw ratings | |
* @param srcPart partitioner for src IDs | |
* @param dstPart partitioner for dst IDs | |
* | |
* @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) | |
*/ | |
private def partitionRatings[ID: ClassTag]( | |
ratings: RDD[Rating[ID]], | |
srcPart: Partitioner, | |
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = { | |
/* The implementation produces the same result as the following but generates less objects. | |
ratings.map { r => | |
((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) | |
}.aggregateByKey(new RatingBlockBuilder)( | |
seqOp = (b, r) => b.add(r), | |
combOp = (b0, b1) => b0.merge(b1.build())) | |
.mapValues(_.build()) | |
*/ | |
val numPartitions = srcPart.numPartitions * dstPart.numPartitions | |
ratings.mapPartitions { iter => | |
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID]) | |
iter.flatMap { r => | |
val srcBlockId = srcPart.getPartition(r.user) | |
val dstBlockId = dstPart.getPartition(r.item) | |
val idx = srcBlockId + srcPart.numPartitions * dstBlockId | |
val builder = builders(idx) | |
builder.add(r) | |
if (builder.size >= 2048) { // 2048 * (3 * 4) = 24k | |
builders(idx) = new RatingBlockBuilder | |
Iterator.single(((srcBlockId, dstBlockId), builder.build())) | |
} else { | |
Iterator.empty | |
} | |
} ++ { | |
builders.view.zipWithIndex.filter(_._1.size > 0).map { | |
case (block, idx) => | |
val srcBlockId = idx % srcPart.numPartitions | |
val dstBlockId = idx / srcPart.numPartitions | |
((srcBlockId, dstBlockId), block.build()) | |
} | |
} | |
}.groupByKey().mapValues { blocks => | |
val builder = new RatingBlockBuilder[ID] | |
blocks.foreach(builder.merge) | |
builder.build() | |
}.setName("ratingBlocks") | |
} | |
/** | |
* Builder for uncompressed in-blocks of (srcId, dstEncodedIndex, rating) tuples. | |
* @param encoder encoder for dst indices | |
*/ | |
private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) ID: ClassTag]( | |
encoder: LocalIndexEncoder)( | |
implicit ord: Ordering[ID]) { | |
private val srcIds = mutable.ArrayBuilder.make[ID] | |
private val dstEncodedIndices = mutable.ArrayBuilder.make[Int] | |
private val ratings = mutable.ArrayBuilder.make[Float] | |
/** | |
* Adds a dst block of (srcId, dstLocalIndex, rating) tuples. | |
* | |
* @param dstBlockId dst block ID | |
* @param srcIds original src IDs | |
* @param dstLocalIndices dst local indices | |
* @param ratings ratings | |
*/ | |
def add( | |
dstBlockId: Int, | |
srcIds: Array[ID], | |
dstLocalIndices: Array[Int], | |
ratings: Array[Float]): this.type = { | |
val sz = srcIds.length | |
require(dstLocalIndices.length == sz) | |
require(ratings.length == sz) | |
this.srcIds ++= srcIds | |
this.ratings ++= ratings | |
var j = 0 | |
while (j < sz) { | |
this.dstEncodedIndices += encoder.encode(dstBlockId, dstLocalIndices(j)) | |
j += 1 | |
} | |
this | |
} | |
/** Builds a [[UncompressedInBlock]]. */ | |
def build(): UncompressedInBlock[ID] = { | |
new UncompressedInBlock(srcIds.result(), dstEncodedIndices.result(), ratings.result()) | |
} | |
} | |
/** | |
* A block of (srcId, dstEncodedIndex, rating) tuples stored in primitive arrays. | |
*/ | |
private[recommendation] class UncompressedInBlock[@specialized(Int, Long) ID: ClassTag]( | |
val srcIds: Array[ID], | |
val dstEncodedIndices: Array[Int], | |
val ratings: Array[Float])( | |
implicit ord: Ordering[ID]) { | |
/** Size the of block. */ | |
def length: Int = srcIds.length | |
/** | |
* Compresses the block into an [[InBlock]]. The algorithm is the same as converting a | |
* sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. | |
* Sorting is done using Spark's built-in Timsort to avoid generating too many objects. | |
*/ | |
def compress(): InBlock[ID] = { | |
val sz = length | |
assert(sz > 0, "Empty in-link block should not exist.") | |
sort() | |
val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID] | |
val dstCountsBuilder = mutable.ArrayBuilder.make[Int] | |
var preSrcId = srcIds(0) | |
uniqueSrcIdsBuilder += preSrcId | |
var curCount = 1 | |
var i = 1 | |
var j = 0 | |
while (i < sz) { | |
val srcId = srcIds(i) | |
if (srcId != preSrcId) { | |
uniqueSrcIdsBuilder += srcId | |
dstCountsBuilder += curCount | |
preSrcId = srcId | |
j += 1 | |
curCount = 0 | |
} | |
curCount += 1 | |
i += 1 | |
} | |
dstCountsBuilder += curCount | |
val uniqueSrcIds = uniqueSrcIdsBuilder.result() | |
val numUniqueSrdIds = uniqueSrcIds.length | |
val dstCounts = dstCountsBuilder.result() | |
val dstPtrs = new Array[Int](numUniqueSrdIds + 1) | |
var sum = 0 | |
i = 0 | |
while (i < numUniqueSrdIds) { | |
sum += dstCounts(i) | |
i += 1 | |
dstPtrs(i) = sum | |
} | |
InBlock(uniqueSrcIds, dstPtrs, dstEncodedIndices, ratings) | |
} | |
private def sort(): Unit = { | |
val sz = length | |
// Since there might be interleaved log messages, we insert a unique id for easy pairing. | |
val sortId = Utils.random.nextInt() | |
logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)") | |
val start = System.nanoTime() | |
val sorter = new Sorter(new UncompressedInBlockSort[ID]) | |
sorter.sort(this, 0, length, Ordering[KeyWrapper[ID]]) | |
val duration = (System.nanoTime() - start) / 1e9 | |
logDebug(s"Sorting took $duration seconds. (sortId = $sortId)") | |
} | |
} | |
/** | |
* A wrapper that holds a primitive key. | |
* | |
* @see [[UncompressedInBlockSort]] | |
*/ | |
private class KeyWrapper[@specialized(Int, Long) ID: ClassTag]( | |
implicit ord: Ordering[ID]) extends Ordered[KeyWrapper[ID]] { | |
var key: ID = _ | |
override def compare(that: KeyWrapper[ID]): Int = { | |
ord.compare(key, that.key) | |
} | |
def setKey(key: ID): this.type = { | |
this.key = key | |
this | |
} | |
} | |
/** | |
* [[SortDataFormat]] of [[UncompressedInBlock]] used by [[Sorter]]. | |
*/ | |
private class UncompressedInBlockSort[@specialized(Int, Long) ID: ClassTag]( | |
implicit ord: Ordering[ID]) | |
extends SortDataFormat[KeyWrapper[ID], UncompressedInBlock[ID]] { | |
override def newKey(): KeyWrapper[ID] = new KeyWrapper() | |
override def getKey( | |
data: UncompressedInBlock[ID], | |
pos: Int, | |
reuse: KeyWrapper[ID]): KeyWrapper[ID] = { | |
if (reuse == null) { | |
new KeyWrapper().setKey(data.srcIds(pos)) | |
} else { | |
reuse.setKey(data.srcIds(pos)) | |
} | |
} | |
override def getKey( | |
data: UncompressedInBlock[ID], | |
pos: Int): KeyWrapper[ID] = { | |
getKey(data, pos, null) | |
} | |
private def swapElements[@specialized(Int, Float) T]( | |
data: Array[T], | |
pos0: Int, | |
pos1: Int): Unit = { | |
val tmp = data(pos0) | |
data(pos0) = data(pos1) | |
data(pos1) = tmp | |
} | |
override def swap(data: UncompressedInBlock[ID], pos0: Int, pos1: Int): Unit = { | |
swapElements(data.srcIds, pos0, pos1) | |
swapElements(data.dstEncodedIndices, pos0, pos1) | |
swapElements(data.ratings, pos0, pos1) | |
} | |
override def copyRange( | |
src: UncompressedInBlock[ID], | |
srcPos: Int, | |
dst: UncompressedInBlock[ID], | |
dstPos: Int, | |
length: Int): Unit = { | |
System.arraycopy(src.srcIds, srcPos, dst.srcIds, dstPos, length) | |
System.arraycopy(src.dstEncodedIndices, srcPos, dst.dstEncodedIndices, dstPos, length) | |
System.arraycopy(src.ratings, srcPos, dst.ratings, dstPos, length) | |
} | |
override def allocate(length: Int): UncompressedInBlock[ID] = { | |
new UncompressedInBlock( | |
new Array[ID](length), new Array[Int](length), new Array[Float](length)) | |
} | |
override def copyElement( | |
src: UncompressedInBlock[ID], | |
srcPos: Int, | |
dst: UncompressedInBlock[ID], | |
dstPos: Int): Unit = { | |
dst.srcIds(dstPos) = src.srcIds(srcPos) | |
dst.dstEncodedIndices(dstPos) = src.dstEncodedIndices(srcPos) | |
dst.ratings(dstPos) = src.ratings(srcPos) | |
} | |
} | |
/** | |
* Creates in-blocks and out-blocks from rating blocks. | |
* @param prefix prefix for in/out-block names | |
* @param ratingBlocks rating blocks | |
* @param srcPart partitioner for src IDs | |
* @param dstPart partitioner for dst IDs | |
* @return (in-blocks, out-blocks) | |
*/ | |
private def makeBlocks[ID: ClassTag]( | |
prefix: String, | |
ratingBlocks: RDD[((Int, Int), RatingBlock[ID])], | |
srcPart: Partitioner, | |
dstPart: Partitioner, | |
storageLevel: StorageLevel)( | |
implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)]) = { | |
val inBlocks = ratingBlocks.map { | |
case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) => | |
// The implementation is a faster version of | |
// val dstIdToLocalIndex = dstIds.toSet.toSeq.sorted.zipWithIndex.toMap | |
val start = System.nanoTime() | |
val dstIdSet = new OpenHashSet[ID](1 << 20) | |
dstIds.foreach(dstIdSet.add) | |
val sortedDstIds = new Array[ID](dstIdSet.size) | |
var i = 0 | |
var pos = dstIdSet.nextPos(0) | |
while (pos != -1) { | |
sortedDstIds(i) = dstIdSet.getValue(pos) | |
pos = dstIdSet.nextPos(pos + 1) | |
i += 1 | |
} | |
assert(i == dstIdSet.size) | |
Sorting.quickSort(sortedDstIds) | |
val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.length) | |
i = 0 | |
while (i < sortedDstIds.length) { | |
dstIdToLocalIndex.update(sortedDstIds(i), i) | |
i += 1 | |
} | |
logDebug( | |
"Converting to local indices took " + (System.nanoTime() - start) / 1e9 + " seconds.") | |
val dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply) | |
(srcBlockId, (dstBlockId, srcIds, dstLocalIndices, ratings)) | |
}.groupByKey(new ALSPartitioner(srcPart.numPartitions)) | |
.mapValues { iter => | |
val builder = | |
new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions)) | |
iter.foreach { | |
case (dstBlockId, srcIds, dstLocalIndices, ratings) => | |
builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) | |
} | |
builder.build().compress() | |
}.setName(prefix + "InBlocks") | |
.persist(storageLevel) | |
val outBlocks = inBlocks.mapValues { | |
case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) => | |
val encoder = new LocalIndexEncoder(dstPart.numPartitions) | |
val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int]) | |
var i = 0 | |
val seen = new Array[Boolean](dstPart.numPartitions) | |
while (i < srcIds.length) { | |
var j = dstPtrs(i) | |
ju.Arrays.fill(seen, false) | |
while (j < dstPtrs(i + 1)) { | |
val dstBlockId = encoder.blockId(dstEncodedIndices(j)) | |
if (!seen(dstBlockId)) { | |
activeIds(dstBlockId) += i // add the local index in this out-block | |
seen(dstBlockId) = true | |
} | |
j += 1 | |
} | |
i += 1 | |
} | |
activeIds.map { x => | |
x.result() | |
} | |
}.setName(prefix + "OutBlocks") | |
.persist(storageLevel) | |
(inBlocks, outBlocks) | |
} | |
/** | |
* Compute dst factors by constructing and solving least square problems. | |
* | |
* @param srcFactorBlocks src factors | |
* @param srcOutBlocks src out-blocks | |
* @param dstInBlocks dst in-blocks | |
* @param rank rank | |
* @param regParam regularization constant | |
* @param srcEncoder encoder for src local indices | |
* @param implicitPrefs whether to use implicit preference | |
* @param alpha the alpha constant in the implicit preference formulation | |
* @param solver solver for least squares problems | |
* | |
* @return dst factors | |
*/ | |
private def computeFactors[ID]( | |
biases: RDD[(Int, Array[Float])], | |
srcFactorBlocks: RDD[(Int, FactorBlock)], | |
srcOutBlocks: RDD[(Int, OutBlock)], | |
dstInBlocks: RDD[(Int, InBlock[ID])], | |
rank: Int, | |
regParam: Double, | |
srcEncoder: LocalIndexEncoder, | |
implicitPrefs: Boolean = false, | |
alpha: Double = 1.0, | |
solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)] = { | |
val numSrcBlocks = srcFactorBlocks.partitions.length | |
// newSrcFactors.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
// val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None | |
val tmp = srcFactorBlocks.map(x => { | |
val f = x._2.map(v => { | |
val t = v.clone() | |
t(0) = 1.0f | |
t | |
}) | |
(x._1, f) | |
}) | |
tmp.cache() | |
val YtY = if (implicitPrefs) Some(computeYtY(tmp, rank + 1)) else None | |
tmp.unpersist() | |
val srcOut = srcOutBlocks.join(srcFactorBlocks) | |
.flatMap { | |
case (srcBlockId, (srcOutBlock, srcFactors)) => | |
srcOutBlock.view.zipWithIndex.map { | |
case (activeIndices, dstBlockId) => | |
(dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx)))) | |
} | |
// .map(x => ((x._1, x._2._1), x._2._2)) | |
} | |
// .map(x => (x._1._1, (x._1._2, x._2._1, x._2._2))) | |
val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length)) | |
val out = dstInBlocks | |
.join(merged) | |
.join(biases) | |
.mapValues(x => (x._1._1, x._1._2, x._2)) | |
.mapValues { | |
//x: (Int, (Iterable[ALS2.InBlock[ID]], Iterable[Iterable[(Int, Array[Array[Float]])]], Iterable[Array[Float]])) | |
//((ALS2.InBlock[ID], Iterable[(Int, Array[Array[Float]])]), Array[Float])) | |
case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors, biases) => | |
val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks) | |
srcFactors.foreach { | |
case (srcBlockId, factors) => | |
sortedSrcFactors(srcBlockId) = factors | |
} | |
val dstFactors = new Array[Array[Float]](dstIds.length) | |
var j = 0 | |
val ls = new NormalEquation(rank + 1) | |
while (j < dstIds.length) { | |
ls.reset() | |
if (implicitPrefs) { | |
ls.merge(YtY.get) | |
} | |
var i = srcPtrs(j) | |
var numExplicits = 0 | |
while (i < srcPtrs(j + 1)) { | |
val encoded = srcEncodedIndices(i) | |
val blockId = srcEncoder.blockId(encoded) | |
val localIndex = srcEncoder.localIndex(encoded) | |
val srcFactor = sortedSrcFactors(blockId)(localIndex) | |
srcFactor(0) = 1.0f | |
val bias = biases(j) | |
val rating = ratings(i) - bias | |
if (implicitPrefs) { | |
// Extension to the original paper to handle b < 0. confidence is a function of |b| | |
// instead so that it is never negative. c1 is confidence - 1.0. | |
val c1 = alpha * math.abs(rating) | |
// For rating <= 0, the corresponding preference is 0. So the term below is only added | |
// for rating > 0. Because YtY is already added, we need to adjust the scaling here. | |
if (rating > 0) { | |
numExplicits += 1 | |
ls.add(srcFactor, (c1 + 1.0) / c1, c1) | |
} | |
} else { | |
ls.add(srcFactor, rating) | |
numExplicits += 1 | |
} | |
i += 1 | |
} | |
// Weight lambda by the number of explicit ratings based on the ALS-WR paper. | |
dstFactors(j) = solver.solve(ls, numExplicits * regParam) | |
j += 1 | |
} | |
dstFactors | |
} | |
//restore factors | |
out | |
} | |
/** | |
* Computes the Gramian matrix of user or item factors, which is only used in implicit preference. | |
* Caching of the input factors is handled in [[ALS#train]]. | |
*/ | |
private def computeYtY(factorBlocks: RDD[(Int, FactorBlock)], rank: Int): NormalEquation = { | |
factorBlocks.values.aggregate(new NormalEquation(rank))( | |
seqOp = (ne, factors) => { | |
factors.foreach(ne.add(_, 0.0)) | |
ne | |
}, | |
combOp = (ne1, ne2) => ne1.merge(ne2)) | |
} | |
/** | |
* Encoder for storing (blockId, localIndex) into a single integer. | |
* | |
* We use the leading bits (including the sign bit) to store the block id and the rest to store | |
* the local index. This is based on the assumption that users/items are approximately evenly | |
* partitioned. With this assumption, we should be able to encode two billion distinct values. | |
* | |
* @param numBlocks number of blocks | |
*/ | |
private[recommendation] class LocalIndexEncoder(numBlocks: Int) extends Serializable { | |
require(numBlocks > 0, s"numBlocks must be positive but found $numBlocks.") | |
private[this] final val numLocalIndexBits = | |
math.min(java.lang.Integer.numberOfLeadingZeros(numBlocks - 1), 31) | |
private[this] final val localIndexMask = (1 << numLocalIndexBits) - 1 | |
/** Encodes a (blockId, localIndex) into a single integer. */ | |
def encode(blockId: Int, localIndex: Int): Int = { | |
require(blockId < numBlocks) | |
require((localIndex & ~localIndexMask) == 0) | |
(blockId << numLocalIndexBits) | localIndex | |
} | |
/** Gets the block id from an encoded index. */ | |
@inline | |
def blockId(encoded: Int): Int = { | |
encoded >>> numLocalIndexBits | |
} | |
/** Gets the local index from an encoded index. */ | |
@inline | |
def localIndex(encoded: Int): Int = { | |
encoded & localIndexMask | |
} | |
} | |
/** | |
* Partitioner used by ALS. We requires that getPartition is a projection. That is, for any key k, | |
* we have getPartition(getPartition(k)) = getPartition(k). Since the the default HashPartitioner | |
* satisfies this requirement, we simply use a type alias here. | |
*/ | |
private[recommendation]type ALSPartitioner = org.apache.spark.HashPartitioner | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment