Last active
January 2, 2016 22:49
-
-
Save hgavert/8372675 to your computer and use it in GitHub Desktop.
Scalding and Monoid introduction talk at Helsinki Data Science meetup on 2014-01-09. Added the BloomFilter and HyperLogLog examples later on.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.algebird._ | |
import com.twitter.algebird.Operators._ | |
// generate 2 lists | |
val A = (1 to 300).toList | |
val B = (201 to 400).toList | |
// Generate a Bloomfilter | |
val NUM_HASHES = 6 | |
val WIDTH = 6000 // bits | |
val SEED = 1 | |
implicit val bfm = new BloomFilterMonoid(NUM_HASHES, WIDTH, SEED) | |
// approximate set with bloomfilter | |
val A_bf = A.map{i => bfm.create(i.toString)}.reduce(_ + _) | |
val B_bf = B.map{i => bfm.create(i.toString)}.reduce(_ + _) | |
// Let's check the members | |
A.filter{i => (A_bf.contains("" + i).isTrue)}.size // 300 | |
B.filter{i => (A_bf.contains("" + i).isTrue)}.size // 100 | |
A.filter{i => (B_bf.contains("" + i).isTrue)}.size // 100 | |
B.filter{i => (B_bf.contains("" + i).isTrue)}.size // 200 | |
// combination: | |
val AuB_bf = A_bf + B_bf | |
A.filter{i => (AuB_bf.contains("" + i).isTrue)}.size // 300 | |
B.filter{i => (AuB_bf.contains("" + i).isTrue)}.size // 200 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.algebird._ | |
import com.twitter.algebird.Operators._ | |
import com.twitter.algebird.HyperLogLog._ | |
// generate 2 lists | |
val A = (1 to 300).toList | |
val B = (201 to 400).toList | |
// all users | |
val AuB = A ++ B | |
// unique users of the union | |
AuB.toSet.size // 400 | |
// now let's approximate the same with HyperLogLog | |
implicit val hllm = new HyperLogLogMonoid(12) // precision 12 bits | |
val A_hll = A.map(hllm(_)).reduce(_ + _) | |
val B_hll = B.map(hllm(_)).reduce(_ + _) | |
// now we can ask the HLL to give estimates of the number of the unique users | |
A_hll.approximateSize.estimate // = 298 | |
B_hll.approximateSize.estimate // = 201 | |
// The the magic: we can sum the 2 HyperLogLogs and ask the number of | |
// unique users which should estimate the number of unique users | |
// from the union of sets A and B | |
val AuB_hll = A_hll + B_hll | |
AuB_hll.approximateSize.estimate // = 402 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Monoid[T] { | |
def zero: T | |
def plus(left: T, right: T): T | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sanoma.cda.examples | |
import com.twitter.scalding._ | |
/** | |
* Basic word count example, let's build on this | |
*/ | |
class WordCount1(args: Args) extends Job(args) { | |
TextLine(args("input")) | |
.flatMap('line -> 'word) { line: String => line.split("\\s+") } | |
.groupBy('word) { _.size } | |
.write(Tsv(args("output"))) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sanoma.cda.examples | |
import com.twitter.scalding._ | |
/** | |
* UDF: move logic from anonymous function to separate function for clarity and easy unit testing | |
* Add filter for empty lines - note that this runs together on the mappers, no new MR steps | |
* GroupAll forces all data to one reducer, but great for case when you know the output is small | |
* Also groups are kind of builders and all reduce operations are executed together in the same reduce phase | |
*/ | |
class WordCount2(args: Args) extends Job(args) { | |
TextLine(args("input")) | |
.flatMap('line -> 'word) { line: String => tokenize(line) } | |
.filter('word) { word: String => word != "" } | |
.groupBy('word) { _.size } | |
.groupAll{ _.sortBy(('size, 'word)).reverse } // this is just for easy results | |
.write(Tsv(args("output"))) | |
def tokenize(text: String): Array[String] = { | |
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sanoma.cda.examples | |
import com.twitter.scalding._ | |
import com.twitter.algebird.Operators._ | |
/** | |
* Illustration on how to aggregate with Monoids. | |
* Transform data to map and then use algebird operators to aggregate maps. | |
* You will run out of mem if the map grows too big. | |
*/ | |
class WordCount3(args: Args) extends Job(args) { | |
TextLine(args("input")) | |
.flatMap('line -> 'word) { tokenize } | |
.map('word -> 'word) { w: String => Map[String, Int](w -> 1) } | |
.groupAll{ _.sum[Map[String, Int]]('word) } | |
// We could save the map here, but if we want similar output as in previous... | |
.flatMap('word -> ('word, 'size)) { words: Map[String, Int] => words.toList } | |
.groupAll{ _.sortBy(('size, 'word)).reverse } // this is just for easy results | |
.write(Tsv(args("output"))) | |
def tokenize(text: String): Array[String] = { | |
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+").filter( _ != "") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sanoma.cda.examples | |
import com.twitter.scalding._ | |
import com.twitter.algebird._ | |
/** | |
* More sensible aggregation with Monoids. | |
* Use SketchMap to get only the top words that we are interested about. | |
* SketchMap is a generalization of the CountMinSketch in Algebird. Holds list of top items. | |
* The size of the CMS will not grow so this will not run out of mem. | |
*/ | |
class WordCount5(args: Args) extends Job(args) { | |
implicit def utf8(s: String): Array[Byte] = com.twitter.bijection.Injection.utf8(s) | |
implicit val cmm = new SketchMapMonoid[String, Long](128, 6, 0, 20) // top 20 | |
type ApproxMap = SketchMap[String, Long] | |
TextLine(args("input")) | |
.flatMap('line -> 'word) { tokenize } | |
.map('word -> 'word) { w: String => cmm.create((w, 1L)) } | |
.groupAll{ _.sum[ApproxMap]('word) } | |
.flatMap('word -> ('word, 'size)) { words: ApproxMap => words.heavyHitters } | |
.write(Tsv(args("output"))) | |
def tokenize(text: String): Array[String] = { | |
text.toLowerCase.replaceAll("[^a-z0-9\\s]", "").split("\\s+").filter( _ != "") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment