-
-
Save codejitsu/5ec90d6a43d9ab6171c949d8ac32d0b8 to your computer and use it in GitHub Desktop.
Spark Streaming meets Algebird's HyperLogLog Monoid
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spark.streaming.StreamingContext._ | |
import spark.streaming.{Seconds, StreamingContext} | |
import spark.SparkContext._ | |
import spark.storage.StorageLevel | |
import spark.streaming.examples.twitter.TwitterInputDStream | |
import com.twitter.algebird.HyperLogLog._ | |
import com.twitter.algebird._ | |
/** | |
* Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's | |
* TwitterInputDStream | |
*/ | |
object StreamingHLL { | |
def main(args: Array[String]) { | |
if (args.length < 3) { | |
System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + | |
" [filter1] [filter2] ... [filter n]") | |
System.exit(1) | |
} | |
val Array(master, username, password) = args.slice(0, 3) | |
val filters = args.slice(3, args.length) | |
val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) | |
val stream = new TwitterInputDStream(ssc, username, password, filters, | |
StorageLevel.MEMORY_ONLY_SER) | |
ssc.registerInputStream(stream) | |
val users = stream.map(status => status.getUser.getId) | |
val globalHll = new HyperLogLogMonoid(12) | |
var userSet: Set[Long] = Set() | |
val approxUsers = users.mapPartitions(ids => { | |
val hll = new HyperLogLogMonoid(12) | |
ids.map(id => hll(id)) | |
}).reduce(_ + _) | |
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) | |
var h = globalHll.zero | |
approxUsers.foreach(rdd => { | |
if (rdd.count() != 0) { | |
val partial = rdd.first() | |
h += partial | |
println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) | |
println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) | |
} | |
}) | |
exactUsers.foreach(rdd => { | |
if (rdd.count() != 0) { | |
val partial = rdd.first() | |
userSet ++= partial | |
println("Exact distinct users this batch: %d".format(partial.size)) | |
println("Exact distinct users overall: %d".format(userSet.size)) | |
println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) | |
} | |
}) | |
ssc.start() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment