Created
January 8, 2014 09:48
-
-
Save sscdotopen/8314254 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.examples | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.rdd.RDD | |
import java.util.Random | |
import scala.collection.mutable | |
import org.apache.spark.serializer.KryoRegistrator | |
import com.esotericsoftware.kryo.Kryo | |
case class Interaction(val user: Int, val item: Int) | |
object CooccurrenceAnalysis { | |
private class CoocRegistrator extends KryoRegistrator { | |
override def registerClasses(kryo: Kryo) { | |
kryo.register(classOf[Interaction]) | |
} | |
} | |
def main(args: Array[String]) { | |
val master = "local" | |
val interactionsFile = "/home/ssc/Entwicklung/datasets/movielens1M/ratings.dat" | |
val separator = "::" | |
val numSlices = 2 | |
val maxSimilarItemsPerItem = 100 | |
val maxInteractionsPerUserOrItem = 500 | |
val seed = 12345 | |
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
System.setProperty("spark.kryo.registrator", classOf[CoocRegistrator].getName) | |
System.setProperty("spark.kryo.referenceTracking", "false") | |
System.setProperty("spark.kryoserializer.buffer.mb", "8") | |
System.setProperty("spark.locality.wait", "10000") | |
val sc = new SparkContext(master, "CooccurrenceAnalysis") | |
val rawInteractions = sc.textFile(interactionsFile, numSlices).map { line => | |
val fields = line.split(separator) | |
Interaction(fields(0).toInt, fields(1).toInt) | |
} | |
val interactions = downSample(sc, rawInteractions, maxInteractionsPerUserOrItem, seed) | |
interactions.cache() | |
val numInteractions = interactions.count() | |
val numInteractionsPerItem = | |
countsToDict(interactions.map(interaction => (interaction.item, 1)).reduceByKey(_ + _)) | |
sc.broadcast(numInteractionsPerItem) | |
/* create the upper diagonal half of the cooccurrence matrix: | |
emit and count all pairs of items that cooccur in the interaction history of a user */ | |
val cooccurrences = interactions.groupBy(_.user).flatMap({ case (user, history) => { | |
for (interactionA <- history; interactionB <- history; if interactionA.item > interactionB.item) | |
yield { ((interactionA.item, interactionB.item), 1l) } | |
}}).reduceByKey(_ + _) | |
/* compute the pairwise loglikelihood similarities for the upper half of the cooccurrence matrix */ | |
val similarities = cooccurrences.map({ case ((itemA, itemB), count) => { | |
val interactionsWithAandB = count | |
val interactionsWithAnotB = numInteractionsPerItem(itemA) - interactionsWithAandB | |
val interactionsWithBnotA = numInteractionsPerItem(itemB) - interactionsWithAandB | |
val interactionsWithNeitherAnorB = numInteractions - numInteractionsPerItem(itemA) - | |
numInteractionsPerItem(itemB) + interactionsWithAandB | |
val logLikelihood = LogLikelihood.logLikelihoodRatio(interactionsWithAandB, interactionsWithAnotB, | |
interactionsWithBnotA, interactionsWithNeitherAnorB) | |
val logLikelihoodSimilarity = 1.0 - 1.0 / (1.0 + logLikelihood) | |
((itemA, itemB), logLikelihoodSimilarity) | |
}}) | |
val bidirectionalSimilarities = similarities.flatMap { case ((itemA, itemB), similarity) => | |
Seq((itemA, (itemB, similarity)), (itemB, (itemA, similarity))) } | |
val order = Ordering.fromLessThan[(Int, Double)]({ case ((itemA, similarityA), (itemB, similarityB)) => { | |
similarityA > similarityB | |
}}) | |
/* use a fixed-size priority queue to only retain the top similar items per item */ | |
val topKSimilarities = bidirectionalSimilarities.groupByKey().flatMap({ case (item, candidates) => { | |
/* val queue = new mutable.PriorityQueue[(Int,Double)]()(order) | |
candidates.foreach({ candidate => { | |
if (queue.size < maxSimilarItemsPerItem) { | |
queue.enqueue(candidate) | |
} else { | |
if (order.lt(candidate, queue.head)) { | |
queue.dequeue() | |
queue.enqueue(candidate) | |
} | |
} | |
}})*/ | |
//for ((similarItem, similarity) <- queue.dequeueAll) | |
for ((similarItem, similarity) <- topK(candidates, order, maxSimilarItemsPerItem)) | |
yield { item + "\t" + similarItem } | |
}}) | |
topKSimilarities.saveAsTextFile("/tmp/similarities/") | |
sc.stop() | |
} | |
def topK(candidates: Seq[(Int, Double)], order: Ordering[(Int, Double)], k: Int) = { | |
val queue = new mutable.PriorityQueue[(Int,Double)]()(order) | |
candidates.foreach({ candidate => { | |
if (queue.size < k) { | |
queue.enqueue(candidate) | |
} else { | |
if (order.lt(candidate, queue.head)) { | |
queue.dequeue() | |
queue.enqueue(candidate) | |
} | |
} | |
}}) | |
queue.dequeueAll | |
} | |
def downSample(sc:SparkContext, interactions: RDD[Interaction], maxInteractionsPerUserOrItem: Int, seed: Int) = { | |
val numInteractionsPerUser = | |
countsToDict(interactions.map(interaction => (interaction.user, 1)).reduceByKey(_ + _)) | |
sc.broadcast(numInteractionsPerUser) | |
val numInteractionsPerItem = | |
countsToDict(interactions.map(interaction => (interaction.item, 1)).reduceByKey(_ + _)) | |
sc.broadcast(numInteractionsPerItem) | |
def hash(x: Int): Int = { | |
val r = x ^ (x >>> 20) ^ (x >>> 12) | |
r ^ (r >>> 7) ^ (r >>> 4) | |
} | |
/* apply the filtering on a per-partition basis to ensure repeatability in case of failures by | |
incorporating the partition index into the random seed */ | |
interactions.mapPartitionsWithIndex({ case (index, interactions) => { | |
val random = new Random(hash(seed ^ index)) | |
interactions.filter({ interaction => { | |
val perUserSampleRate = math.min(maxInteractionsPerUserOrItem, numInteractionsPerUser(interaction.user)) / | |
numInteractionsPerUser(interaction.user) | |
val perItemSampleRate = math.min(maxInteractionsPerUserOrItem, numInteractionsPerItem(interaction.item)) / | |
numInteractionsPerItem(interaction.item) | |
random.nextDouble() <= math.min(perUserSampleRate, perItemSampleRate) | |
}}) | |
}}) | |
} | |
def countsToDict(tuples: RDD[(Int, Int)]) = { | |
tuples.toArray().foldLeft(Map[Int, Int]()) { case (table, (item, count)) => table + (item -> count) } | |
} | |
} | |
object LogLikelihood { | |
def logLikelihoodRatio(k11: Long, k12: Long, k21: Long, k22: Long) = { | |
val rowEntropy: Double = entropy(k11 + k12, k21 + k22) | |
val columnEntropy: Double = entropy(k11 + k21, k12 + k22) | |
val matrixEntropy: Double = entropy(k11, k12, k21, k22) | |
if (rowEntropy + columnEntropy < matrixEntropy) { | |
0.0 | |
} else { | |
2.0 * (rowEntropy + columnEntropy - matrixEntropy) | |
} | |
} | |
private def xLogX(x: Long): Double = { | |
if (x == 0) { | |
0.0 | |
} else { | |
x * math.log(x) | |
} | |
} | |
private def entropy(a: Long, b: Long): Double = { xLogX(a + b) - xLogX(a) - xLogX(b) } | |
private def entropy(elements: Long*): Double = { | |
var sum: Long = 0 | |
var result: Double = 0.0 | |
for (element <- elements) { | |
result += xLogX(element) | |
sum += element | |
} | |
xLogX(sum) - result | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment