Skip to content

Instantly share code, notes, and snippets.

@marcovivero
Created July 14, 2015 22:58
Show Gist options
  • Save marcovivero/cc2f252de4e3afb1b504 to your computer and use it in GitHub Desktop.
Save marcovivero/cc2f252de4e3afb1b504 to your computer and use it in GitHub Desktop.
def aggHomogeneity (input : RDD[(Int, Int, Double)]) : RDD[(Int, Int, Double)] = {
input.cache
// Get row sums.
val rowSums: HashMap[Int, Double] = HashMap(
input.groupBy(_._1)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
// Get column sums.
val colSums : HashMap[Int, Double] = HashMap(
input.groupBy(_._2)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
val matrixSum : Double = input.map(_._3).reduce(_ + _)
val processedRDD : RDD[(Int, Int, Array[Double])]= input.map(
e => {
(
e._1,
e._2,
Array(
e._3,
rowSums(e._1) - e._3,
colSums(e._2) - e._3,
matrixSum - rowSums(e._1) - colSums(e._2) + e._3
)
)
}
)
processedRDD.map(e => {
val nullprobs : Array[Double] = {
val totalSum : Double = e._3.sum
Array((e._3(0) + e._3(2)) / totalSum, (e._3(1) + e._3(3)) / totalSum)
}
val expectedValues : Array[Double] = Array(
nullprobs(0) * (e._3(0) + e._3(1)),
nullprobs(1) * (e._3(0) + e._3(1)),
nullprobs(0) * (e._3(2) + e._3(3)),
nullprobs(1) * (e._3(2) + e._3(3))
)
(e._1, e._2, e._3.zip(expectedValues).map(counts => {
2 * counts._1 * (log(counts._1) - log(counts._2))
}).reduce(_ + _ ))
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment