Created
April 2, 2016 20:55
-
-
Save syedatifakhtar/6b4b055898727ee0391d6b2cdd9376f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*************************************Spark Basics************************************************ | |
**************************************************************************************************/ | |
val someSentences = List("The quick brown fox jumped over the wall","Another sentence","Some more") | |
someSentences.filter(sentence=> !sentence.toLowerCase.startsWith("some")) | |
someSentences.map(x=>x.split(" ")) | |
someSentences.flatMap(x=>x.split(" ")) | |
Output: | |
someSentences: List[String] = List(The quick brown fox jumped over the wall, Another sentence, Some more) | |
res24: List[String] = List(The quick brown fox jumped over the wall, Another sentence) | |
res26: List[Array[String]] = List(Array(The, quick, brown, fox, jumped, over, the, wall), Array(Another, sentence), Array(Some, more)) | |
res27: List[String] = List(The, quick, brown, fox, jumped, over, the, wall, Another, sentence, Some, more) | |
val wordsRDD = sc.textFile("/Users/syedatifakhtar/Notes/wordcount/").flatMap(x=>x.split(" ")) | |
val wordsByCountRDD = wordsRDD | |
.map(word=>(word,1)) | |
.reduceByKey((a,b)=> a+b ) | |
Output: | |
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[492] at flatMap at <console>:45 | |
wordsByCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[494] at reduceByKey at <console>:49 | |
val someWords = sc.textFile("/Users/syedatifakhtar/Notes/wordcount/").flatMap(x=>x.split(" ")) | |
val someOther = someWords.collect() | |
someWords.persist() | |
val filteredArticles = someWords.filter(x=> !x.toLowerCase.equals("the") && !x.toLowerCase.equals("a")) | |
val filteredAndOr = someWords.filter(x=> !x.toLowerCase.equals("and") && !x.toLowerCase.equals("or")) | |
filteredArticles.take(20) | |
filteredAndOr.take(30) | |
Output: | |
someWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[594] at flatMap at <console>:45 | |
someOther: Array[String] = Array(The, Project, Gutenberg, EBook, of, The, King, James, Bible, "", This, eBook, is, for, the, use, of, anyone, anywhere, at, no, cost, and, with, almost, no, restrictions, whatsoever., "", You, may, copy, it,, give, it, away, or, re-use, it, under, the, terms, of, the, Project, Gutenberg, License, included, with, this, eBook, or, online, at, www.gutenberg.org, "", "", Title:, The, King, James, Bible, "", Release, Date:, March, 2,, 2011, [EBook, #10], [This, King, James, Bible, was, orginally, posted, by, Project, Gutenberg, in, late, 1989], "", Language:, English, "", "", ***, START, OF, THIS, PROJECT, GUTENBERG, EBOOK, THE, KING, JAMES, BIBLE, ***, "", "", "", "", "", "", "", "", "", "", "", "", "", "", The, Old, Testament, of, the, King, James, Version, ...someOtherRDD: someWords.type = MapPartitionsRDD[594] at flatMap at <console>:45 | |
filteredArticles: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[595] at filter at <console>:47 | |
filteredAndOr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[596] at filter at <console>:47 | |
res51: Array[String] = Array(Project, Gutenberg, EBook, of, King, James, Bible, "", This, eBook, is, for, use, of, anyone, anywhere, at, no, cost, and) | |
res52: Array[String] = Array(The, Project, Gutenberg, EBook, of, The, King, James, Bible, "", This, eBook, is, for, the, use, of, anyone, anywhere, at, no, cost, with, almost, no, restrictions, whatsoever., "", You, may) | |
Output: | |
/*************************************Twitter Streaming With Spark SQL**************************** | |
**************************************************************************************************/ | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.twitter._ | |
import org.apache.spark.storage.StorageLevel | |
import scala.io.Source | |
import scala.collection.mutable.HashMap | |
import java.io.File | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
import sys.process.stringSeqToProcess | |
/** Configures the Oauth Credentials for accessing Twitter */ | |
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { | |
val configs = new HashMap[String, String] ++= Seq( | |
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) | |
println("Configuring Twitter OAuth") | |
configs.foreach{ case(key, value) => | |
if (value.trim.isEmpty) { | |
throw new Exception("Error setting authentication - value for " + key + " not set") | |
} | |
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") | |
System.setProperty(fullKey, value.trim) | |
println("\tProperty " + fullKey + " set as [" + value.trim + "]") | |
} | |
println() | |
} | |
// Configure Twitter credentials | |
val apiKey = "" | |
val apiSecret = "" | |
val accessToken = "" | |
val accessTokenSecret = "" | |
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) | |
import org.apache.spark.streaming.twitter._ | |
val ssc = new StreamingContext(sc, Seconds(60)) | |
val tweets = TwitterUtils.createStream(ssc, None) | |
val twt = tweets.window(Seconds(300)) | |
case class Tweet(createdAt:Long, text:String, screenName:String) | |
val mappedTweets = twt.map(status=> | |
Tweet(status.getCreatedAt().getTime()/1000, status.getText().replace("\n"," ").replace("\r"," "), status.getUser().getScreenName()) | |
) | |
mappedTweets.foreachRDD {rdd=> | |
rdd.saveAsTextfile | |
} | |
val filteredTweets = mappedTweets.filter{tweet=>tweet.text.replace("\n"," ").replace("\r"," ").toLowerCase.contains("happy")} | |
filteredTweets.foreachRDD{rdd=> | |
rdd.toDF().registerTempTable("filteredTweets") | |
} | |
ssc.start() | |
%sql | |
select from_unixtime(createdAt) as created, screenName, text from tweets | |
%sql | |
select minute, count(1) as cnt from | |
( | |
select substr(from_unixtime(createdAt),0,16) as minute, screenName, text from filteredTweets | |
) sub1 group by minute | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment