-
-
Save MLnick/5286475 to your computer and use it in GitHub Desktop.
| import spark.SparkContext | |
| import SparkContext._ | |
| /** | |
| * A port of [[http://blog.echen.me/2012/02/09/movie-recommendations-and-more-via-mapreduce-and-scalding/]] | |
| * to Spark. | |
| * Uses movie ratings data from MovieLens 100k dataset found at [[http://www.grouplens.org/node/73]] | |
| */ | |
| object MovieSimilarities { | |
| def main(args: Array[String]) { | |
| /** | |
| * Parameters to regularize correlation. | |
| */ | |
| val PRIOR_COUNT = 10 | |
| val PRIOR_CORRELATION = 0 | |
| val TRAIN_FILENAME = "ua.base" | |
| val TEST_FIELNAME = "ua.test" | |
| val MOVIES_FILENAME = "u.item" | |
| /** | |
| * Spark programs require a SparkContext to be initialized | |
| */ | |
| val master = args(0) | |
| val sc = new SparkContext(master, "MovieSimilarities") | |
| // get movie names keyed on id | |
| val movies = sc.textFile(MOVIES_FILENAME) | |
| .map(line => { | |
| val fields = line.split("\\|") | |
| (fields(0).toInt, fields(1)) | |
| }) | |
| val movieNames = movies.collectAsMap() // for local use to map id <-> movie name for pretty-printing | |
| // extract (userid, movieid, rating) from ratings data | |
| val ratings = sc.textFile(TRAIN_FILENAME) | |
| .map(line => { | |
| val fields = line.split("\t") | |
| (fields(0).toInt, fields(1).toInt, fields(2).toInt) | |
| }) | |
| // get num raters per movie, keyed on movie id | |
| val numRatersPerMovie = ratings | |
| .groupBy(tup => tup._2) | |
| .map(grouped => (grouped._1, grouped._2.size)) | |
| // join ratings with num raters on movie id | |
| val ratingsWithSize = ratings | |
| .groupBy(tup => tup._2) | |
| .join(numRatersPerMovie) | |
| .flatMap(joined => { | |
| joined._2._1.map(f => (f._1, f._2, f._3, joined._2._2)) | |
| }) | |
| // ratingsWithSize now contains the following fields: (user, movie, rating, numRaters). | |
| // dummy copy of ratings for self join | |
| val ratings2 = ratingsWithSize.keyBy(tup => tup._1) | |
| // join on userid and filter movie pairs such that we don't double-count and exclude self-pairs | |
| val ratingPairs = | |
| ratingsWithSize | |
| .keyBy(tup => tup._1) | |
| .join(ratings2) | |
| .filter(f => f._2._1._2 < f._2._2._2) | |
| // compute raw inputs to similarity metrics for each movie pair | |
| val vectorCalcs = | |
| ratingPairs | |
| .map(data => { | |
| val key = (data._2._1._2, data._2._2._2) | |
| val stats = | |
| (data._2._1._3 * data._2._2._3, // rating 1 * rating 2 | |
| data._2._1._3, // rating movie 1 | |
| data._2._2._3, // rating movie 2 | |
| math.pow(data._2._1._3, 2), // square of rating movie 1 | |
| math.pow(data._2._2._3, 2), // square of rating movie 2 | |
| data._2._1._4, // number of raters movie 1 | |
| data._2._2._4) // number of raters movie 2 | |
| (key, stats) | |
| }) | |
| .groupByKey() | |
| .map(data => { | |
| val key = data._1 | |
| val vals = data._2 | |
| val size = vals.size | |
| val dotProduct = vals.map(f => f._1).sum | |
| val ratingSum = vals.map(f => f._2).sum | |
| val rating2Sum = vals.map(f => f._3).sum | |
| val ratingSq = vals.map(f => f._4).sum | |
| val rating2Sq = vals.map(f => f._5).sum | |
| val numRaters = vals.map(f => f._6).max | |
| val numRaters2 = vals.map(f => f._7).max | |
| (key, (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2)) | |
| }) | |
| // compute similarity metrics for each movie pair | |
| val similarities = | |
| vectorCalcs | |
| .map(fields => { | |
| val key = fields._1 | |
| val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2 | |
| val corr = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq) | |
| val regCorr = regularizedCorrelation(size, dotProduct, ratingSum, rating2Sum, | |
| ratingNormSq, rating2NormSq, PRIOR_COUNT, PRIOR_CORRELATION) | |
| val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq)) | |
| val jaccard = jaccardSimilarity(size, numRaters, numRaters2) | |
| (key, (corr, regCorr, cosSim, jaccard)) | |
| }) | |
| // test a few movies out (substitute the contains call with the relevant movie name | |
| val sample = similarities.filter(m => { | |
| val movies = m._1 | |
| (movieNames(movies._1).contains("Star Wars (1977)")) | |
| }) | |
| // collect results, excluding NaNs if applicable | |
| val result = sample.map(v => { | |
| val m1 = v._1._1 | |
| val m2 = v._1._2 | |
| val corr = v._2._1 | |
| val rcorr = v._2._2 | |
| val cos = v._2._3 | |
| val j = v._2._4 | |
| (movieNames(m1), movieNames(m2), corr, rcorr, cos, j) | |
| }).collect().filter(e => !(e._4 equals Double.NaN)) // test for NaNs must use equals rather than == | |
| .sortBy(elem => elem._4).take(10) | |
| // print the top 10 out | |
| result.foreach(r => println(r._1 + " | " + r._2 + " | " + r._3.formatted("%2.4f") + " | " + r._4.formatted("%2.4f") | |
| + " | " + r._5.formatted("%2.4f") + " | " + r._6.formatted("%2.4f"))) | |
| } | |
| // ************************* | |
| // * SIMILARITY MEASURES | |
| // ************************* | |
| /** | |
| * The correlation between two vectors A, B is | |
| * cov(A, B) / (stdDev(A) * stdDev(B)) | |
| * | |
| * This is equivalent to | |
| * [n * dotProduct(A, B) - sum(A) * sum(B)] / | |
| * sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] } | |
| */ | |
| def correlation(size : Double, dotProduct : Double, ratingSum : Double, | |
| rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = { | |
| val numerator = size * dotProduct - ratingSum * rating2Sum | |
| val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) * | |
| scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum) | |
| numerator / denominator | |
| } | |
| /** | |
| * Regularize correlation by adding virtual pseudocounts over a prior: | |
| * RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation | |
| * where w = # actualPairs / (# actualPairs + # virtualPairs). | |
| */ | |
| def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double, | |
| rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double, | |
| virtualCount : Double, priorCorrelation : Double) = { | |
| val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq) | |
| val w = size / (size + virtualCount) | |
| w * unregularizedCorrelation + (1 - w) * priorCorrelation | |
| } | |
| /** | |
| * The cosine similarity between two vectors A, B is | |
| * dotProduct(A, B) / (norm(A) * norm(B)) | |
| */ | |
| def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = { | |
| dotProduct / (ratingNorm * rating2Norm) | |
| } | |
| /** | |
| * The Jaccard Similarity between two sets A, B is | |
| * |Intersection(A, B)| / |Union(A, B)| | |
| */ | |
| def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = { | |
| val union = totalUsers1 + totalUsers2 - usersInCommon | |
| usersInCommon / union | |
| } | |
| } |
Nice script, but yes, it does provide a list of movies that are most dissimilar. To get the similar names, just need to change the sort in result. We can do that by adding a minus sign in the sortby, change line 129 to:
.sortBy(elem =>- elem._4).take(10)
Just had a doubt at this step "val sample = similarities.filter(m => {
val movies = m._1
(movieNames(movies._1).contains("Star Wars (1977)"))
})"
Don't we miss few recommendations if we just do a find on m._1 ?
Who can tell me the source of the regularizedCorrelation formula and how to understand the regularizedCorrelation formula?
Compute listed vertex-based similarity measures for all the pairs of nodes in label
data file. These similarity measures are computed between two nodes by utilizing
neighborhood and/or node information of both nodes.
Common neighbors
Jaccard coefficient
Adamic/Adar
---------------------Anyone can tell me how to solve this problem using scala ?
Sir, I am facing problem in creating a weighted bipartite graph using spark graphx.
I want to create from a csv file.
The data has three columns
user product weight. user is a string, product is a string and weight is an integer. Kindly suggest.
Thank you.This program gives dissimilar movie names.What has to be changed to get similar movie names