Skip to content

Instantly share code, notes, and snippets.

@hgavert
Last active January 2, 2016 22:49
Show Gist options
  • Save hgavert/8372675 to your computer and use it in GitHub Desktop.
Save hgavert/8372675 to your computer and use it in GitHub Desktop.
Scalding and Monoid introduction talk at Helsinki Data Science meetup on 2014-01-09. Added the BloomFilter and HyperLogLog examples later on.
import com.twitter.algebird._
import com.twitter.algebird.Operators._
// generate 2 lists
val A = (1 to 300).toList
val B = (201 to 400).toList
// Generate a Bloomfilter
val NUM_HASHES = 6
val WIDTH = 6000 // bits
val SEED = 1
implicit val bfm = new BloomFilterMonoid(NUM_HASHES, WIDTH, SEED)
// approximate set with bloomfilter
val A_bf = A.map{i => bfm.create(i.toString)}.reduce(_ + _)
val B_bf = B.map{i => bfm.create(i.toString)}.reduce(_ + _)
// Let's check the members
A.filter{i => (A_bf.contains("" + i).isTrue)}.size // 300
B.filter{i => (A_bf.contains("" + i).isTrue)}.size // 100
A.filter{i => (B_bf.contains("" + i).isTrue)}.size // 100
B.filter{i => (B_bf.contains("" + i).isTrue)}.size // 200
// combination:
val AuB_bf = A_bf + B_bf
A.filter{i => (AuB_bf.contains("" + i).isTrue)}.size // 300
B.filter{i => (AuB_bf.contains("" + i).isTrue)}.size // 200
import com.twitter.algebird._
import com.twitter.algebird.Operators._
import com.twitter.algebird.HyperLogLog._
// generate 2 lists
val A = (1 to 300).toList
val B = (201 to 400).toList
// all users
val AuB = A ++ B
// unique users of the union
AuB.toSet.size // 400
// now let's approximate the same with HyperLogLog
implicit val hllm = new HyperLogLogMonoid(12) // precision 12 bits
val A_hll = A.map(hllm(_)).reduce(_ + _)
val B_hll = B.map(hllm(_)).reduce(_ + _)
// now we can ask the HLL to give estimates of the number of the unique users
A_hll.approximateSize.estimate // = 298
B_hll.approximateSize.estimate // = 201
// The the magic: we can sum the 2 HyperLogLogs and ask the number of
// unique users which should estimate the number of unique users
// from the union of sets A and B
val AuB_hll = A_hll + B_hll
AuB_hll.approximateSize.estimate // = 402
trait Monoid[T] {
def zero: T
def plus(left: T, right: T): T
}
package com.sanoma.cda.examples
import com.twitter.scalding._
/**
* Basic word count example, let's build on this
*/
class WordCount1(args: Args) extends Job(args) {
TextLine(args("input"))
.flatMap('line -> 'word) { line: String => line.split("\\s+") }
.groupBy('word) { _.size }
.write(Tsv(args("output")))
}
package com.sanoma.cda.examples
import com.twitter.scalding._
/**
* UDF: move logic from anonymous function to separate function for clarity and easy unit testing
* Add filter for empty lines - note that this runs together on the mappers, no new MR steps
* GroupAll forces all data to one reducer, but great for case when you know the output is small
* Also groups are kind of builders and all reduce operations are executed together in the same reduce phase
*/
class WordCount2(args: Args) extends Job(args) {
TextLine(args("input"))
.flatMap('line -> 'word) { line: String => tokenize(line) }
.filter('word) { word: String => word != "" }
.groupBy('word) { _.size }
.groupAll{ _.sortBy(('size, 'word)).reverse } // this is just for easy results
.write(Tsv(args("output")))
def tokenize(text: String): Array[String] = {
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+")
}
}
package com.sanoma.cda.examples
import com.twitter.scalding._
import com.twitter.algebird.Operators._
/**
* Illustration on how to aggregate with Monoids.
* Transform data to map and then use algebird operators to aggregate maps.
* You will run out of mem if the map grows too big.
*/
class WordCount3(args: Args) extends Job(args) {
TextLine(args("input"))
.flatMap('line -> 'word) { tokenize }
.map('word -> 'word) { w: String => Map[String, Int](w -> 1) }
.groupAll{ _.sum[Map[String, Int]]('word) }
// We could save the map here, but if we want similar output as in previous...
.flatMap('word -> ('word, 'size)) { words: Map[String, Int] => words.toList }
.groupAll{ _.sortBy(('size, 'word)).reverse } // this is just for easy results
.write(Tsv(args("output")))
def tokenize(text: String): Array[String] = {
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+").filter( _ != "")
}
}
package com.sanoma.cda.examples
import com.twitter.scalding._
import com.twitter.algebird._
/**
* More sensible aggregation with Monoids.
* Use SketchMap to get only the top words that we are interested about.
* SketchMap is a generalization of the CountMinSketch in Algebird. Holds list of top items.
* The size of the CMS will not grow so this will not run out of mem.
*/
class WordCount5(args: Args) extends Job(args) {
implicit def utf8(s: String): Array[Byte] = com.twitter.bijection.Injection.utf8(s)
implicit val cmm = new SketchMapMonoid[String, Long](128, 6, 0, 20) // top 20
type ApproxMap = SketchMap[String, Long]
TextLine(args("input"))
.flatMap('line -> 'word) { tokenize }
.map('word -> 'word) { w: String => cmm.create((w, 1L)) }
.groupAll{ _.sum[ApproxMap]('word) }
.flatMap('word -> ('word, 'size)) { words: ApproxMap => words.heavyHitters }
.write(Tsv(args("output")))
def tokenize(text: String): Array[String] = {
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+").filter( _ != "")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment