Skip to content

Instantly share code, notes, and snippets.

@maasg
Created October 24, 2014 18:59
Show Gist options
  • Select an option

  • Save maasg/a26ee2f1a11075937565 to your computer and use it in GitHub Desktop.

Select an option

Save maasg/a26ee2f1a11075937565 to your computer and use it in GitHub Desktop.
Example code for a stateful stream processor using Spark Streaming and Cassandra
/**
-- Datamodel
-- local keyspace
CREATE KEYSPACE example
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
-- table schema
CREATE TABLE example.words (
word text PRIMARY KEY,
count int
);
*/
// use nc -lk 9876 on a separate shell to provide input to the socketTextStream
// just copy/paste this job on a spark-shell session
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}, StreamingContext._
import org.apache.spark.storage.StorageLevel
import com.datastax.spark.connector._
case class WordCount(word:String, count:Int)
val seenWords = sc.cassandraTable[WordCount]("example", "words").map(w => (w.word, w.count))
@transient val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_ONLY)
val wordStream = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
val runningTotal = wordStream.transform{ rdd => rdd.join(seenWords)}.map{case (k,(v1,v2)) => WordCount(k, v1+v2)}
runningTotal.foreachRDD(rdd => rdd.saveToCassandra("example", "words"))
wordStream.print()
runningTotal.print()
ssc.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment