Created
September 21, 2016 17:41
-
-
Save cmk/99f8cbcace188a4b6d501fbcce4eaafa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package labAnswers.lecture10a | |
| import com.twitter.summingbird._ | |
| import com.twitter.summingbird.memory._ | |
| import collection.mutable.{ Map => MutableMap } | |
| import org.joda.time.DateTime | |
| import org.joda.time.Months | |
| import com.twitter.algebird.TopCMS | |
| import com.twitter.algebird.TopPctCMS | |
| import com.twitter.algebird.TopPctCMSMonoid | |
| import com.twitter.algebird.CMSHasherImplicits._ | |
| import Tweets._ | |
| object WordFrequencyByTime { | |
| val platform = new Memory | |
| private def tokenize(text: String): TraversableOnce[String] = | |
| text.toLowerCase | |
| .replaceAll("[^a-zA-Z0-9\\s]", "") | |
| .split("\\s+") | |
| val topPctCMSMonoid: TopPctCMSMonoid[String] = { | |
| val eps = 0.001 | |
| val delta = 1E-8 | |
| val seed = 123 | |
| val heavyHittersPct = 0.01 | |
| TopPctCMS.monoid[String](eps, delta, seed, heavyHittersPct) | |
| } | |
| // Task 5a | |
| val tweetProducer: Producer[Memory, Tweet] = | |
| Memory.toSource(tweetIterator(32)) | |
| val store: MutableMap[Int, TopCMS[String]] = MutableMap[Int, TopCMS[String]]() | |
| // Task 5b | |
| val keyedByMonth: KeyedProducer[Memory, Int, TopCMS[String]] = | |
| tweetProducer.map { (tweet: Tweet) => | |
| //val year: Int = tweet.time.getYear() | |
| val months: Int = | |
| Months.monthsBetween(firstTime.toLocalDate(), tweet.time.toLocalDate()). | |
| getMonths() | |
| val message: String = tweet.message | |
| val wordTokens: Seq[String] = tokenize(message).toSeq | |
| val wordSketch: TopCMS[String] = | |
| topPctCMSMonoid.create(wordTokens) | |
| (months, wordSketch) | |
| } | |
| // Task 5c | |
| val summed: Summer[Memory, Int, TopCMS[String]] = | |
| keyedByMonth.sumByKey(store)(topPctCMSMonoid) | |
| // val sink: platform.Sink[(Int, (Option[TopCMS[String]], TopCMS[String]))] = | |
| // tup => () | |
| // val sunk = summed.write(sink) | |
| // Task 5d | |
| val planned: Stream[(Int, (Option[TopCMS[String]], TopCMS[String]))] = | |
| platform.plan(summed) | |
| //platform.plan(sunk) | |
| } | |
| object WordFrequencyByTimeExample extends App { | |
| import WordFrequencyByTime._ | |
| println("initial store") | |
| store.toIndexedSeq.sortBy(_._1).foreach { | |
| case (year: Int, cms: TopCMS[String]) => | |
| println(s"month: $year heavy hitters: ${cms.heavyHitters}") | |
| } | |
| println("-------------------------") | |
| val lines = 16384 | |
| println(s"Processing $lines lines") | |
| planned.take(lines).foreach(_ => ()) | |
| println("--------------------------") | |
| println(s"store after pulling $lines lines from source") | |
| store.toIndexedSeq.sortBy(_._1).foreach { | |
| case (year: Int, cms: TopCMS[String]) => | |
| println(s"month: $year total count: ${cms.totalCount} heavy hitters: ${cms.heavyHitters}") | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment