Skip to content

Instantly share code, notes, and snippets.

@imrenagi
Last active April 1, 2016 22:42
Show Gist options
  • Save imrenagi/8e3ca16e6b778a0cd7cb393b163a6d16 to your computer and use it in GitHub Desktop.
Save imrenagi/8e3ca16e6b778a0cd7cb393b163a6d16 to your computer and use it in GitHub Desktop.
case class SearchStream(userID: Long, timestamp: Long, query: String)
object SearchGen {
val lines = Source.fromFile("blog_tags.csv").getLines.toList
var words = lines.map(lines => lines.split(",")(1).replace("\"", "")).take(20).map(_.trim.toLowerCase)
def searchStream(): Gen[SearchStream] = {
val stream = for {
userID <- Gen.choose(0,100)
query <- Gen.oneOf(words)
timestamp <- Gen.choose(new DateTime().minusDays(7).getMillis, new DateTime().getMillis)
} yield {
SearchStream(userID, timestamp, query)
}
stream
}
}
object SearchOps {
def getQuery(search: DStream[SearchStream]) : DStream[String] = {
search.map(_.query)
}
def countSearch(search: DStream[SearchStream], batchInterval: Duration, windowSize: Int): DStream[(String, Int)] = {
val (windowDuration, windowInterval) = (batchInterval * windowSize, batchInterval)
val counts = search.map(stream => (stream.query, 1)).reduceByKeyAndWindow(_ + _, _ - _, windowDuration, windowInterval)
counts
}
def topSearch(search: DStream[SearchStream], batchInterval: Duration, windowSize: Int): DStream[String] = {
val countingResult = countSearch(search, batchInterval, windowSize)
val topSearch = countingResult.map{ case (query, count) => (count, query)}.transform{rdd => {
val sorted = rdd.sortByKey(false)
rdd.sparkContext.parallelize(sorted.take(1).map(_._2))
}}
topSearch
}
}
@RunWith(classOf[JUnitRunner])
class SearchStreamingSpec
extends Specification
with DStreamTLProperty with ResultMatchers with ScalaCheck with Serializable {
// Spark configuration
override def sparkMaster : String = "local[5]"
val batchInterval = Duration(1000)
override def batchDuration = batchInterval
override def defaultParallelism = 3
override def enableCheckpointing = true
def is =
sequential ^ s2"""
- where only one top search ${onlyOneTopSearch}
"""
def onlyOneTopSearch = {
type U = (RDD[SearchStream], RDD[String])
val topQueryBatch = (_:U)._2
val numBatches = 5
val searches = BatchGen.ofN(10, SearchGen.searchStream())
val gen = BatchGen.always(searches, numBatches)
val formula: Formula[U] = {
always {
at(topQueryBatch){ query =>
query.count <= 1
}
} during numBatches
}
forAllDStream(gen)(SearchOps.topSearch(_, batchInterval, 2))(formula)
}.set(minTestsOk = 10).verbose
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment