Skip to content

Instantly share code, notes, and snippets.

@rjhall
Created August 6, 2013 00:30
Show Gist options
  • Save rjhall/6160945 to your computer and use it in GitHub Desktop.
Save rjhall/6160945 to your computer and use it in GitHub Desktop.
package com.etsy.scalding.jobs
import com.etsy.scalding._
import com.twitter.scalding._
class Demo(args : Args) extends Job(args) {
// best way to compute covariance matrix?
// the dimension.
val N = 1000;
val a = SequenceFile("something", 'something) // some file with ~1M tuples in it
.mapTo(() -> 'vec){_ : Unit => val r = new scala.util.Random(); (0 until N).map{i => r.nextGaussian}.toArray}
// a) "unoptimized flat map."
a .flatMapTo('vec -> ('i, 'j, 'val)){v : Array[Double] => (0 until N).flatMap{i => (0 until N).map{j => (i, j, v(i) * v(j))}}}
.groupBy('i, 'j){_.sum('val)}
.project('i, 'j, 'val)
.write(SequenceFile("demo/a"))
// spends all its time spilling and doesn't do any work.
// b) "optimized flat map."
a .flatMapTo('vec -> ('i, 'j, 'val)){v : Array[Double] => (0 until N).flatMap{i => (0 until N).map{j => (i, j, v(i) * v(j))}}}
.groupBy('i, 'j){_.sum('val).spillThreshold(N*N)}
.project('i, 'j, 'val)
.write(SequenceFile("demo/b"))
// conceptually should work just fine but doesn't. Mappers just bog down and dont get anywhere.
// c) "non flat map"
a .mapTo('vec -> 'mat){v : Array[Double] => v.map{r => v.map{c => r * c}}}
.groupAll{_.reduce('mat){(a : Array[Array[Double]], b : Array[Array[Double]]) =>
(0 until N).foreach{i => (0 until N).foreach{j => a(i)(j) += b(i)(j)}}
a
}}
.flatMapTo('mat -> ('i, 'j, 'val)){x : Array[Array[Double]] => (0 until N).flatMap{i => (0 until N).map{j => (i, j, x(i)(j))}}}
.project('i, 'j, 'val)
.write(SequenceFile("demo/c"))
// works fine.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment