Skip to content

Instantly share code, notes, and snippets.

@rjhall
Created August 6, 2013 15:39
Show Gist options
  • Save rjhall/6165629 to your computer and use it in GitHub Desktop.
Save rjhall/6165629 to your computer and use it in GitHub Desktop.
Demo.scala
import com.twitter.scalding._
class Demo(args : Args) extends Job(args) {
override def config(implicit mode : Mode) = super.config ++ Map("cascading.aggregateby.threshold" -> "1000000")
import com.twitter.scalding.TDsl._
// best way to compute covariance matrix?
val N = 1000;
val a = // some pipe with a million tuples in it.
.mapTo(() -> 'vec){_ : Unit => val r = new scala.util.Random(); (0 until N).map{i => r.nextGaussian}.toArray}
.toTypedPipe[Array[Double]]('vec)
// b) "optimized flat map."
a .flatMap[(Int, Int, Double)]{v => (0 until N).flatMap{i => (0 until N).map{j => (i, j, v(i) * v(j))}}}
.groupBy[(Int, Int)]{t => (t._1, t._2)}
.reduce{(a, b) => (a._1, a._2, a._3 + b._3)}
.values
.toPipe('i, 'j, 'val)
.write(SequenceFile("demo/b"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment