Last active
April 24, 2019 11:04
-
-
Save ceteri/50774f04a8f9ff32ce08 to your computer and use it in GitHub Desktop.
Spark Streaming demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# using four part files to construct "minitweet" | |
cat rawtweets/part-0000[1-3] > minitweets | |
# change log4j properties to WARN to reduce noise during demo | |
mv conf/log4j.properties.template conf/log4j.properties | |
vim conf/log4j.properties # Change to WARN | |
# launch Spark shell REPL | |
./bin/spark-shell |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load some tweets | |
val tweets = sc.textFile("minitweets") | |
tweets.take(5) | |
// pretty-print the data that's in JSON format | |
import com.fasterxml.jackson.databind.ObjectMapper | |
def prettyPrint(str: String) = { | |
val mapper = new ObjectMapper() | |
val prettyPrinter = mapper.writerWithDefaultPrettyPrinter() | |
val obj = mapper.readValue(str, classOf[java.util.Map[String, Object]]) | |
println(prettyPrinter.writeValueAsString(obj)) | |
} | |
tweets.take(5).foreach(prettyPrint(_)) | |
// sc is an existing SparkContext | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext._ | |
// create a SchemaRDD from the JSON file | |
val tweetTable = sqlContext.jsonFile("minitweets") | |
// let's take a look at the schema | |
tweetTable.printSchema | |
// use SQL queries to explore the data | |
tweetTable.registerAsTable("tweetTable") | |
sql("SELECT text FROM tweetTable LIMIT 10").collect.foreach(println) | |
sql("SELECT user.name, text, lang FROM tweetTable LIMIT 10").collect.foreach(println) | |
// which are the top ten languages represented? | |
sql("SELECT lang, COUNT(*) AS cnt FROM tweetTable GROUP BY lang ORDER BY cnt DESC LIMIT 10").collect.foreach(println) | |
// feature engineering, based on an ngram approach | |
val texts = sql("SELECT text FROM tweetTable").map(_.head.toString) | |
import org.apache.spark.mllib.clustering.KMeans | |
import org.apache.spark.mllib.linalg.{Vector, Vectors} | |
def featurize(s: String): Vector = { | |
val n = 1000 | |
val result = new Array[Double](n) | |
val bigrams = s.sliding(2).toArray | |
for (h <- bigrams.map(_.hashCode % n)) { | |
result(h) += 1.0 / bigrams.length | |
} | |
Vectors.sparse(n, result.zipWithIndex.filter(_._1 != 0).map(_.swap)) | |
} | |
// test this function | |
featurize("Hello World!") | |
// train a model | |
val vectors = texts.map(featurize).cache() | |
vectors.count() | |
val model = KMeans.train(vectors, 10, 20) | |
// monitor the stages of this unit of work | |
// http://localhost:4040 | |
// take a look into those clusters | |
val some_tweets = texts.take(100) | |
for (i <- 0 until 10) { | |
println(s"\nCLUSTER $i:") | |
some_tweets.foreach { t => | |
if (model.predict(featurize(t)) == i) { | |
println(t) | |
} | |
} | |
} | |
// persist the model to disk, so we can use it for streaming | |
sc.makeRDD(model.clusterCenters, 10).saveAsObjectFile("model") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# streaming code example | |
# https://github.com/aarondav/spark/tree/demo-latest | |
git clone https://github.com/aarondav/spark.git | |
mv spark ~/opt/aarondav-spark | |
cd aarondav-spark | |
git fetch | |
git checkout demo-latest | |
git pull origin demo-latest | |
# fix the Twitter API keys in the code | |
# dev.twitter.com | |
vim examples/src/main/scala/org/apache/spark/examples/streaming/ClusteringDemo.scala | |
# change log4j properties to WARN to reduce noise during demo | |
mv conf/log4j.properties.template conf/log4j.properties | |
vim conf/log4j.properties # Change to WARN | |
sbt/sbt clean assembly | |
# move the trained model to local path | |
mv ~/opt/model . | |
./bin/spark-submit ./examples/target/scala-2.10/spark-examples-1.1.0-SNAPSHOT-hadoop1.0.4.jar --class org.apache.spark.examples.streaming.ClusteringDemo model 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment