Skip to content

Instantly share code, notes, and snippets.

@cmk
Created September 21, 2016 17:41
Show Gist options
  • Select an option

  • Save cmk/99f8cbcace188a4b6d501fbcce4eaafa to your computer and use it in GitHub Desktop.

Select an option

Save cmk/99f8cbcace188a4b6d501fbcce4eaafa to your computer and use it in GitHub Desktop.
package labAnswers.lecture10a
import com.twitter.summingbird._
import com.twitter.summingbird.memory._
import collection.mutable.{ Map => MutableMap }
import org.joda.time.DateTime
import org.joda.time.Months
import com.twitter.algebird.TopCMS
import com.twitter.algebird.TopPctCMS
import com.twitter.algebird.TopPctCMSMonoid
import com.twitter.algebird.CMSHasherImplicits._
import Tweets._
object WordFrequencyByTime {
val platform = new Memory
private def tokenize(text: String): TraversableOnce[String] =
text.toLowerCase
.replaceAll("[^a-zA-Z0-9\\s]", "")
.split("\\s+")
val topPctCMSMonoid: TopPctCMSMonoid[String] = {
val eps = 0.001
val delta = 1E-8
val seed = 123
val heavyHittersPct = 0.01
TopPctCMS.monoid[String](eps, delta, seed, heavyHittersPct)
}
// Task 5a
val tweetProducer: Producer[Memory, Tweet] =
Memory.toSource(tweetIterator(32))
val store: MutableMap[Int, TopCMS[String]] = MutableMap[Int, TopCMS[String]]()
// Task 5b
val keyedByMonth: KeyedProducer[Memory, Int, TopCMS[String]] =
tweetProducer.map { (tweet: Tweet) =>
//val year: Int = tweet.time.getYear()
val months: Int =
Months.monthsBetween(firstTime.toLocalDate(), tweet.time.toLocalDate()).
getMonths()
val message: String = tweet.message
val wordTokens: Seq[String] = tokenize(message).toSeq
val wordSketch: TopCMS[String] =
topPctCMSMonoid.create(wordTokens)
(months, wordSketch)
}
// Task 5c
val summed: Summer[Memory, Int, TopCMS[String]] =
keyedByMonth.sumByKey(store)(topPctCMSMonoid)
// val sink: platform.Sink[(Int, (Option[TopCMS[String]], TopCMS[String]))] =
// tup => ()
// val sunk = summed.write(sink)
// Task 5d
val planned: Stream[(Int, (Option[TopCMS[String]], TopCMS[String]))] =
platform.plan(summed)
//platform.plan(sunk)
}
object WordFrequencyByTimeExample extends App {
import WordFrequencyByTime._
println("initial store")
store.toIndexedSeq.sortBy(_._1).foreach {
case (year: Int, cms: TopCMS[String]) =>
println(s"month: $year heavy hitters: ${cms.heavyHitters}")
}
println("-------------------------")
val lines = 16384
println(s"Processing $lines lines")
planned.take(lines).foreach(_ => ())
println("--------------------------")
println(s"store after pulling $lines lines from source")
store.toIndexedSeq.sortBy(_._1).foreach {
case (year: Int, cms: TopCMS[String]) =>
println(s"month: $year total count: ${cms.totalCount} heavy hitters: ${cms.heavyHitters}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment