Skip to content

Instantly share code, notes, and snippets.

@marcovivero
Created July 14, 2015 23:11
Show Gist options
  • Save marcovivero/d33c05967e17d698db9e to your computer and use it in GitHub Desktop.
Save marcovivero/d33c05967e17d698db9e to your computer and use it in GitHub Desktop.
def aggHomogeneity (input : RDD[(Int, Int, Double)]) : RDD[(Int, Int, Double)] = {
input.cache
// Get row sums.
val rowSums: HashMap[Int, Double] = HashMap(
input.groupBy(_._1)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
// Get column sums.
val colSums : HashMap[Int, Double] = HashMap(
input.groupBy(_._2)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
// Get full matrix sum, call it N.
val N : Double = input.map(_._3).reduce(_ + _)
val processedRDD : RDD[(Int, Int, Array[Double])]= input.map(
// e = (i, j, count(i, j))
e => {
(
e._1, // i
e._2, // j
Array(
e._3, // count(i, j) =: k(1, 1)
rowSums(e._1) - e._3, // rowSums(i) - count(i, j) =: k(1, 2)
colSums(e._2) - e._3, // colSums(j) - count(i, j) =: k(2, 1)
N - rowSums(e._1) - colSums(e._2) + e._3 // N - rowSums(i) - colSums(j) + count(i, j) =: k(2, 2)
)
)
}
)
processedRDD.map(e => {
// Vector of probabilities under null hypothesis (i.e. p_{1j} = ... = p_{Ij}, j = 1, ..., J)
val nullprobs : Array[Double] = Array(
(e._3(0) + e._3(2)) / N, // [k(1, 1) + k(2, 1)] / N
(e._3(1) + e._3(3)) / N // [k(1, 2) + k(2, 2)] / N
)
val expectedValues : Array[Double] = Array(
nullprobs(0) * (e._3(0) + e._3(1)), // E(1, 1) := p(1) * k(1, 1) + k(1, 2)
nullprobs(1) * (e._3(0) + e._3(1)), // E(1, 2) := p(2) * k(1, 1) + k(1, 2)
nullprobs(0) * (e._3(2) + e._3(3)), // E(2, 1) := p(1) * k(2, 1) + k(2, 2)
nullprobs(1) * (e._3(2) + e._3(3)) // E(2, 2) := p(2) * k(2, 1) + k(2, 2)
)
// Gives us:
// [(k(1, 1), E(1, 1)), (k(1, 2), E(1, 2)), (k(2, 1), E(2, 1)), (k(2, 2), E(2, 2))]
(e._1, e._2, e._3.zip(expectedValues).map(counts => {
// 2 * observed * log(observed / expected) = 2 * observed * [log(observed) - log(expected)]
2 * counts._1 * (log(counts._1) - log(counts._2))
}).reduce(_ + _ ))
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment