Created
April 5, 2016 14:26
-
-
Save JSantosP/38883bf8efbb15b6f4b1eec972c64b1d to your computer and use it in GitHub Desktop.
New Boot for twitter-stream project. With this snippet we're able to measure NRT the number of mentions between Android and IOS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalera.twitter | |
import org.apache.spark.streaming.dstream.DStream | |
import twitter4j.Status | |
object Boot extends Analytics{ | |
// Set checkpoint dir | |
ssc.checkpoint("/tmp") | |
// Add filters ... | |
val Android = "android" | |
val IOS = "ios" | |
filter( | |
Android, | |
IOS | |
) | |
// We group by key | |
val groupedTweets = stream.flatMap{content => | |
List(Android, IOS).flatMap(key => | |
if (content.getText.contains(key)) Option(key -> content) | |
else None) | |
} | |
// We apply the aggregation state function | |
val aggregatedTweets: DStream[(String,Long)] = | |
groupedTweets.updateStateByKey{ | |
(newTweets, previousState) => | |
val newTweetsAmount = newTweets.size.toLong | |
previousState.fold(Some(newTweetsAmount))(previousSize => | |
Some(previousSize + newTweetsAmount)) | |
} | |
// And add actions to perform (like printing the aggregatedTweets) ... | |
aggregatedTweets.foreachRDD{ results => | |
results.foreach{ | |
case (team, amount) => logger.info(s">>>>>>>>>>>>>>>> $team : $amount") | |
} | |
} | |
// ... and begin listening | |
listen() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment