Created
August 14, 2015 02:54
-
-
Save RussellSpitzer/2fa80f283ab021a02cb6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.datastax.spark.connector.streaming | |
import akka.actor._ | |
import akka.testkit.{ImplicitSender, TestKit} | |
import com.datastax.spark.connector._ | |
import com.datastax.spark.connector.cql.CassandraConnector | |
import com.datastax.spark.connector.embedded._ | |
import com.datastax.spark.connector.streaming.StreamingEvent.ReceiverStarted | |
import com.datastax.spark.connector.testkit._ | |
import org.apache.spark.SparkEnv | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions | |
import org.apache.spark.streaming.{Milliseconds, StreamingContext} | |
import scala.reflect.io.Path | |
import scala.util.Try | |
class CheckpointStreamSpec extends ActorSpec with CounterFixture with ImplicitSender { | |
import com.datastax.spark.connector.testkit.TestEvent._ | |
/* Initializations - does not work in the actor test context in a static before() */ | |
CassandraConnector(SparkTemplate.defaultConf).withSessionDo { session => | |
session.execute("CREATE KEYSPACE IF NOT EXISTS demo WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") | |
session.execute("CREATE TABLE IF NOT EXISTS demo.streaming_join (word TEXT PRIMARY KEY, count COUNTER)") | |
for (d <- dataSeq; word <- d) { | |
session.execute("UPDATE demo.streaming_join set count = count + 10 where word = ?", word.trim) | |
} | |
session.execute("CREATE TABLE IF NOT EXISTS demo.checkpoint_wordcount (word TEXT PRIMARY KEY, count COUNTER)") | |
session.execute("CREATE TABLE IF NOT EXISTS demo.checkpoint_output (word TEXT PRIMARY KEY, count COUNTER)") | |
session.execute("TRUNCATE demo.checkpoint_output") | |
session.execute("TRUNCATE demo.checkpoint_wordcount") | |
} | |
"actorStream" must { | |
"work with JWCTable and RPCassandra Replica with checkpointing" in { | |
val checkpointDirName = "/tmp/checkpoint-streaming-dir" | |
val checkpointDir = "file:///tmp/checkpoint-streaming-dir" | |
Try(Path(checkpointDirName).deleteRecursively()) | |
/** Passed into Checkpoint Test**/ | |
def getContextForCheckpointTest(): StreamingContext = { | |
val ssc = new StreamingContext(sc, Milliseconds(300)) | |
info("Creating Context") | |
val stream = ssc.actorStream[String](Props[TestStreamingActor], actorName, StorageLevel.MEMORY_AND_DISK) | |
.flatMap(_.split("\\s+")) | |
println(stream.toString) | |
val wc = stream | |
.map(x => (x, 1)) | |
.reduceByKey(_ + _) | |
.saveToCassandra("demo", "checkpoint_wordcount") | |
/*val jcRepart = stream | |
.map(new Tuple1(_)) | |
.repartitionByCassandraReplica("demo","streaming_join") | |
jcRepart | |
.joinWithCassandraTable("demo","streaming_join") | |
.map(_._2) | |
.saveToCassandra("demo", "checkpoint_output") | |
*/ | |
ssc.checkpoint(checkpointDir) | |
ssc | |
} | |
val checkPointOptions = Some((getContextForCheckpointTest:()=>StreamingContext, checkpointDir )) | |
for ((dataSet,index) <- dataSeq.zipWithIndex) { | |
withStreamingContext( | |
{ ssc => | |
info(s"Iteration $index") | |
system.eventStream.subscribe(self, classOf[StreamingEvent.ReceiverStarted]) | |
println(system) | |
ssc.start() | |
if (index == 0) | |
expectMsgPF(duration) { case ReceiverStarted(receiver) => | |
println(receiver) | |
watch(receiver) | |
system.actorOf(Props(new TestProducer(dataSet.toArray, receiver))) | |
} else { | |
system.actorSelection(s"akka://sparkDriver/user/Supervisor0/$actorName") ! Identify(1) | |
expectMsgPF(duration) { | |
case ActorIdentity('1', Some(ref)) => | |
watch(ref) | |
system.actorOf(Props(new TestProducer(dataSet.toArray, ref))) | |
} | |
} | |
expectMsgPF(duration) { case Terminated(ref) => | |
val rdd = ssc.cassandraTable[WordCount]("demo", "checkpoint_wordcount") | |
rdd.collect.foreach(println) | |
//awaitCond(rdd.collect.nonEmpty && rdd.collect.size == dataSet.size * (index + 1)) | |
//ssc.sparkContext.cassandraTable("demo", "checkpoint_output").collect.size should be(dataSet.size * (index + 1)) | |
} | |
}, | |
checkPointOptions) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment