Skip to content

Instantly share code, notes, and snippets.

@RussellSpitzer
Created August 14, 2015 02:54
Show Gist options
  • Save RussellSpitzer/2fa80f283ab021a02cb6 to your computer and use it in GitHub Desktop.
Save RussellSpitzer/2fa80f283ab021a02cb6 to your computer and use it in GitHub Desktop.
package com.datastax.spark.connector.streaming
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded._
import com.datastax.spark.connector.streaming.StreamingEvent.ReceiverStarted
import com.datastax.spark.connector.testkit._
import org.apache.spark.SparkEnv
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import scala.reflect.io.Path
import scala.util.Try
class CheckpointStreamSpec extends ActorSpec with CounterFixture with ImplicitSender {
import com.datastax.spark.connector.testkit.TestEvent._
/* Initializations - does not work in the actor test context in a static before() */
CassandraConnector(SparkTemplate.defaultConf).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS demo WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE IF NOT EXISTS demo.streaming_join (word TEXT PRIMARY KEY, count COUNTER)")
for (d <- dataSeq; word <- d) {
session.execute("UPDATE demo.streaming_join set count = count + 10 where word = ?", word.trim)
}
session.execute("CREATE TABLE IF NOT EXISTS demo.checkpoint_wordcount (word TEXT PRIMARY KEY, count COUNTER)")
session.execute("CREATE TABLE IF NOT EXISTS demo.checkpoint_output (word TEXT PRIMARY KEY, count COUNTER)")
session.execute("TRUNCATE demo.checkpoint_output")
session.execute("TRUNCATE demo.checkpoint_wordcount")
}
"actorStream" must {
"work with JWCTable and RPCassandra Replica with checkpointing" in {
val checkpointDirName = "/tmp/checkpoint-streaming-dir"
val checkpointDir = "file:///tmp/checkpoint-streaming-dir"
Try(Path(checkpointDirName).deleteRecursively())
/** Passed into Checkpoint Test**/
def getContextForCheckpointTest(): StreamingContext = {
val ssc = new StreamingContext(sc, Milliseconds(300))
info("Creating Context")
val stream = ssc.actorStream[String](Props[TestStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK)
.flatMap(_.split("\\s+"))
println(stream.toString)
val wc = stream
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("demo", "checkpoint_wordcount")
/*val jcRepart = stream
.map(new Tuple1(_))
.repartitionByCassandraReplica("demo","streaming_join")
jcRepart
.joinWithCassandraTable("demo","streaming_join")
.map(_._2)
.saveToCassandra("demo", "checkpoint_output")
*/
ssc.checkpoint(checkpointDir)
ssc
}
val checkPointOptions = Some((getContextForCheckpointTest:()=>StreamingContext, checkpointDir ))
for ((dataSet,index) <- dataSeq.zipWithIndex) {
withStreamingContext(
{ ssc =>
info(s"Iteration $index")
system.eventStream.subscribe(self, classOf[StreamingEvent.ReceiverStarted])
println(system)
ssc.start()
if (index == 0)
expectMsgPF(duration) { case ReceiverStarted(receiver) =>
println(receiver)
watch(receiver)
system.actorOf(Props(new TestProducer(dataSet.toArray, receiver)))
} else {
system.actorSelection(s"akka://sparkDriver/user/Supervisor0/$actorName") ! Identify(1)
expectMsgPF(duration) {
case ActorIdentity('1', Some(ref)) =>
watch(ref)
system.actorOf(Props(new TestProducer(dataSet.toArray, ref)))
}
}
expectMsgPF(duration) { case Terminated(ref) =>
val rdd = ssc.cassandraTable[WordCount]("demo", "checkpoint_wordcount")
rdd.collect.foreach(println)
//awaitCond(rdd.collect.nonEmpty && rdd.collect.size == dataSet.size * (index + 1))
//ssc.sparkContext.cassandraTable("demo", "checkpoint_output").collect.size should be(dataSet.size * (index + 1))
}
},
checkPointOptions)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment