Skip to content

Instantly share code, notes, and snippets.

@rjhall
Created October 12, 2013 15:36
Show Gist options
  • Save rjhall/6951358 to your computer and use it in GitHub Desktop.
Save rjhall/6951358 to your computer and use it in GitHub Desktop.
package com.etsy.scalding.jobs
import com.twitter.scalding._
class TouchTheVoid(args : Args) extends Job(args) {
// 25M (Long, Double) pairs.
val scores = SequenceFile("stuff", ('id, 'score, 'stuff))
.project('id, 'score)
// 3GB is tons more than 25000000 * 16bytes = 400MB...
override def config(implicit mode : Mode) = super.config ++ Map("mapred.child.java.opts" -> "-Xmx3G")
scores
.groupAll{_.toList[(Long, Double)](('id, 'score) -> 'list)}
.flatMapTo('list -> ('score, 'percentile)){
l : List[(Long, Double)] =>
val p = l.size
var z = p + 1.0
l.sortBy{-_._2}.view.map{t => z -= 1; (t._1, z / p)}
}
.write(Tsv("test/list")) // oom
scores
.groupAll{_.toList[(Long, Double)](('id, 'score) -> 'list)}
.flatMapTo('list -> ('score, 'percentile)){
l : List[(Long, Double)] =>
val a = l.toArray
scala.util.Sorting.quickSort(a)(Ordering.by[(Long, Double), Double](-_._2))
val z = a.size.toDouble
var i = 0
while(i < a.size) {
a(i) = (a(i)._1, (z - i) / z);
i += 1
}
a
}
.write(Tsv("test/list_array")) // oom
scores
.groupAll{_.toList[(Long, Double)](('id, 'score) -> 'list)}
.flatMapTo('list -> ('score, 'percentile)){
l : List[(Long, Double)] =>
val a = l.toArray
scala.util.Sorting.quickSort(a)(Ordering.by[(Long, Double), Double](-_._2))
val p = a.size
var z = p + 1.0
a.view.map{t => z -= 1; (t._1, z / p)}
}
.write(Tsv("test/list_array_nice")) // oom
scores
.groupAll{
_.mapStream[(Long, Double), (Long, Double)](('id, 'score) -> ('id_, 'percentile)){
it : Iterator[(Long, Double)] =>
val a = it.toArray
scala.util.Sorting.quickSort(a)(Ordering.by[(Long, Double), Double](-_._2))
val z = a.size.toDouble
var i = 0
while(i < a.size) {
a(i) = (a(i)._1, (z - i) / z);
i += 1
}
a
}
}
.project('id_, 'percentile)
.write(Tsv("test/stream_array")) // works
scores
.groupAll{
_.mapStream[(Long, Double), (Long, Double)](('id, 'score) -> ('id_, 'percentile)){
it : Iterator[(Long, Double)] =>
val a = it.toArray
scala.util.Sorting.quickSort(a)(Ordering.by[(Long, Double), Double](-_._2))
val z = a.size.toDouble
var i = z + 1.0
a.view.map{t => i -= 1; (t._1, i / z)}
}
}
.project('id_, 'percentile)
.write(Tsv("test/stream_array_nice")) // works
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment