Skip to content

Instantly share code, notes, and snippets.

@codejitsu
Forked from MLnick/StreamingHLL.scala
Created July 3, 2017 19:23
Show Gist options
  • Save codejitsu/5ec90d6a43d9ab6171c949d8ac32d0b8 to your computer and use it in GitHub Desktop.
Save codejitsu/5ec90d6a43d9ab6171c949d8ac32d0b8 to your computer and use it in GitHub Desktop.
Spark Streaming meets Algebird's HyperLogLog Monoid
import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext}
import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming.examples.twitter.TwitterInputDStream
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird._
/**
* Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
* TwitterInputDStream
*/
object StreamingHLL {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
val stream = new TwitterInputDStream(ssc, username, password, filters,
StorageLevel.MEMORY_ONLY_SER)
ssc.registerInputStream(stream)
val users = stream.map(status => status.getUser.getId)
val globalHll = new HyperLogLogMonoid(12)
var userSet: Set[Long] = Set()
val approxUsers = users.mapPartitions(ids => {
val hll = new HyperLogLogMonoid(12)
ids.map(id => hll(id))
}).reduce(_ + _)
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
var h = globalHll.zero
approxUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
h += partial
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
}
})
exactUsers.foreach(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
}
})
ssc.start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment