Last active
April 1, 2016 22:42
-
-
Save imrenagi/8e3ca16e6b778a0cd7cb393b163a6d16 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class SearchStream(userID: Long, timestamp: Long, query: String) | |
object SearchGen { | |
val lines = Source.fromFile("blog_tags.csv").getLines.toList | |
var words = lines.map(lines => lines.split(",")(1).replace("\"", "")).take(20).map(_.trim.toLowerCase) | |
def searchStream(): Gen[SearchStream] = { | |
val stream = for { | |
userID <- Gen.choose(0,100) | |
query <- Gen.oneOf(words) | |
timestamp <- Gen.choose(new DateTime().minusDays(7).getMillis, new DateTime().getMillis) | |
} yield { | |
SearchStream(userID, timestamp, query) | |
} | |
stream | |
} | |
} | |
object SearchOps { | |
def getQuery(search: DStream[SearchStream]) : DStream[String] = { | |
search.map(_.query) | |
} | |
def countSearch(search: DStream[SearchStream], batchInterval: Duration, windowSize: Int): DStream[(String, Int)] = { | |
val (windowDuration, windowInterval) = (batchInterval * windowSize, batchInterval) | |
val counts = search.map(stream => (stream.query, 1)).reduceByKeyAndWindow(_ + _, _ - _, windowDuration, windowInterval) | |
counts | |
} | |
def topSearch(search: DStream[SearchStream], batchInterval: Duration, windowSize: Int): DStream[String] = { | |
val countingResult = countSearch(search, batchInterval, windowSize) | |
val topSearch = countingResult.map{ case (query, count) => (count, query)}.transform{rdd => { | |
val sorted = rdd.sortByKey(false) | |
rdd.sparkContext.parallelize(sorted.take(1).map(_._2)) | |
}} | |
topSearch | |
} | |
} | |
@RunWith(classOf[JUnitRunner]) | |
class SearchStreamingSpec | |
extends Specification | |
with DStreamTLProperty with ResultMatchers with ScalaCheck with Serializable { | |
// Spark configuration | |
override def sparkMaster : String = "local[5]" | |
val batchInterval = Duration(1000) | |
override def batchDuration = batchInterval | |
override def defaultParallelism = 3 | |
override def enableCheckpointing = true | |
def is = | |
sequential ^ s2""" | |
- where only one top search ${onlyOneTopSearch} | |
""" | |
def onlyOneTopSearch = { | |
type U = (RDD[SearchStream], RDD[String]) | |
val topQueryBatch = (_:U)._2 | |
val numBatches = 5 | |
val searches = BatchGen.ofN(10, SearchGen.searchStream()) | |
val gen = BatchGen.always(searches, numBatches) | |
val formula: Formula[U] = { | |
always { | |
at(topQueryBatch){ query => | |
query.count <= 1 | |
} | |
} during numBatches | |
} | |
forAllDStream(gen)(SearchOps.topSearch(_, batchInterval, 2))(formula) | |
}.set(minTestsOk = 10).verbose | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment