Skip to content

Instantly share code, notes, and snippets.

@RussellSpitzer
Created August 14, 2015 02:58
Show Gist options
  • Save RussellSpitzer/07c94b39749e06df5a2d to your computer and use it in GitHub Desktop.
Save RussellSpitzer/07c94b39749e06df5a2d to your computer and use it in GitHub Desktop.
trait StreamingSpec extends AbstractSpec with SharedEmbeddedCassandra with SparkTemplate with BeforeAndAfterAll {
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val duration = 10.seconds
useCassandraConfig(Seq("cassandra-default.yaml.template"))
def withStreamingContext(testCode: (StreamingContext) => Any, checkpointed: Option[(()=>StreamingContext,String)] = None): Unit = {
val ssc = checkpointed match {
case Some((contextGenerator, checkpointDir)) =>
StreamingContext.getOrCreate(checkpointDir, contextGenerator)
case None =>
new StreamingContext(sc, Milliseconds(200))
}
try {
testCode (ssc)
}
finally ssc.stop (stopSparkContext = checkpointed.isDefined, stopGracefully = true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment