Created April 1, 2013 17:49
Movie Similarities with Spark
import spark.SparkContext
import SparkContext._
* A port of [[]]
* to Spark.
* Uses movie ratings data from MovieLens 100k dataset found at [[]]
object MovieSimilarities {
def main(args: Array[String]) {
* Parameters to regularize correlation.
val PRIOR_COUNT = 10
val TRAIN_FILENAME = "ua.base"
val TEST_FIELNAME = "ua.test"
val MOVIES_FILENAME = "u.item"
* Spark programs require a SparkContext to be initialized
val master = args(0)
val sc = new SparkContext(master, "MovieSimilarities")
// get movie names keyed on id
val movies = sc.textFile(MOVIES_FILENAME)
.map(line => {
val fields = line.split("\\|")
(fields(0).toInt, fields(1))
val movieNames = movies.collectAsMap() // for local use to map id <-> movie name for pretty-printing
// extract (userid, movieid, rating) from ratings data
val ratings = sc.textFile(TRAIN_FILENAME)
.map(line => {
val fields = line.split("\t")
(fields(0).toInt, fields(1).toInt, fields(2).toInt)
// get num raters per movie, keyed on movie id
val numRatersPerMovie = ratings
.groupBy(tup => tup._2)
.map(grouped => (grouped._1, grouped._2.size))
// join ratings with num raters on movie id
val ratingsWithSize = ratings
.groupBy(tup => tup._2)
.flatMap(joined => { => (f._1, f._2, f._3, joined._2._2))
// ratingsWithSize now contains the following fields: (user, movie, rating, numRaters).
// dummy copy of ratings for self join
val ratings2 = ratingsWithSize.keyBy(tup => tup._1)
// join on userid and filter movie pairs such that we don't double-count and exclude self-pairs
val ratingPairs =
.keyBy(tup => tup._1)
.filter(f => f._2._1._2 < f._2._2._2)
// compute raw inputs to similarity metrics for each movie pair
val vectorCalcs =
.map(data => {
val key = (data._2._1._2, data._2._2._2)
val stats =
(data._2._1._3 * data._2._2._3, // rating 1 * rating 2
data._2._1._3, // rating movie 1
data._2._2._3, // rating movie 2
math.pow(data._2._1._3, 2), // square of rating movie 1
math.pow(data._2._2._3, 2), // square of rating movie 2
data._2._1._4, // number of raters movie 1
data._2._2._4) // number of raters movie 2
(key, stats)
.map(data => {
val key = data._1
val vals = data._2
val size = vals.size
val dotProduct = => f._1).sum
val ratingSum = => f._2).sum
val rating2Sum = => f._3).sum
val ratingSq = => f._4).sum
val rating2Sq = => f._5).sum
val numRaters = => f._6).max
val numRaters2 = => f._7).max
(key, (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2))
// compute similarity metrics for each movie pair
val similarities =
.map(fields => {
val key = fields._1
val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2
val corr = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
val regCorr = regularizedCorrelation(size, dotProduct, ratingSum, rating2Sum,
ratingNormSq, rating2NormSq, PRIOR_COUNT, PRIOR_CORRELATION)
val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq))
val jaccard = jaccardSimilarity(size, numRaters, numRaters2)
(key, (corr, regCorr, cosSim, jaccard))
// test a few movies out (substitute the contains call with the relevant movie name
val sample = similarities.filter(m => {
val movies = m._1
(movieNames(movies._1).contains("Star Wars (1977)"))
// collect results, excluding NaNs if applicable
val result = => {
val m1 = v._1._1
val m2 = v._1._2
val corr = v._2._1
val rcorr = v._2._2
val cos = v._2._3
val j = v._2._4
(movieNames(m1), movieNames(m2), corr, rcorr, cos, j)
}).collect().filter(e => !(e._4 equals Double.NaN)) // test for NaNs must use equals rather than ==
.sortBy(elem => elem._4).take(10)
// print the top 10 out
result.foreach(r => println(r._1 + " | " + r._2 + " | " + r._3.formatted("%2.4f") + " | " + r._4.formatted("%2.4f")
+ " | " + r._5.formatted("%2.4f") + " | " + r._6.formatted("%2.4f")))
// *************************
// *************************
* The correlation between two vectors A, B is
* cov(A, B) / (stdDev(A) * stdDev(B))
* This is equivalent to
* [n * dotProduct(A, B) - sum(A) * sum(B)] /
* sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
def correlation(size : Double, dotProduct : Double, ratingSum : Double,
rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {
val numerator = size * dotProduct - ratingSum * rating2Sum
val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)
numerator / denominator
* Regularize correlation by adding virtual pseudocounts over a prior:
* RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
* where w = # actualPairs / (# actualPairs + # virtualPairs).
def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double,
rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double,
virtualCount : Double, priorCorrelation : Double) = {
val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
val w = size / (size + virtualCount)
w * unregularizedCorrelation + (1 - w) * priorCorrelation
* The cosine similarity between two vectors A, B is
* dotProduct(A, B) / (norm(A) * norm(B))
def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
dotProduct / (ratingNorm * rating2Norm)
* The Jaccard Similarity between two sets A, B is
* |Intersection(A, B)| / |Union(A, B)|
def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
val union = totalUsers1 + totalUsers2 - usersInCommon
usersInCommon / union
Copy link

Thank you.This program gives dissimilar movie names.What has to be changed to get similar movie names

Copy link

rajshah4 commented Sep 5, 2015

Nice script, but yes, it does provide a list of movies that are most dissimilar. To get the similar names, just need to change the sort in result. We can do that by adding a minus sign in the sortby, change line 129 to:
.sortBy(elem =>- elem._4).take(10)

Copy link

Just had a doubt at this step "val sample = similarities.filter(m => {
val movies = m._1
(movieNames(movies._1).contains("Star Wars (1977)"))

Don't we miss few recommendations if we just do a find on m._1 ?

Copy link

Who can tell me the source of the regularizedCorrelation formula and how to understand the regularizedCorrelation formula?

Copy link

smasish commented May 21, 2018

Compute listed vertex-based similarity measures for all the pairs of nodes in label
data file. These similarity measures are computed between two nodes by utilizing
neighborhood and/or node information of both nodes.
Common neighbors
Jaccard coefficient
---------------------Anyone can tell me how to solve this problem using scala ?

Copy link

Sir, I am facing problem in creating a weighted bipartite graph using spark graphx.
I want to create from a csv file.
The data has three columns
user product weight. user is a string, product is a string and weight is an integer. Kindly suggest.

Already have an account? Sign in to comment