Created
April 2, 2015 15:59
-
-
Save noboomu/0e7ca2ec13941a845ec6 to your computer and use it in GitHub Desktop.
Service class to access a Spark CassandraSQLContext from Java. Using DSE in this case.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.sql.cassandra.CassandraSQLContext; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/* This is simply a wrapper around Typedafe's config */ | |
import utilities.Configuration; | |
public class SparkService | |
{ | |
private static Logger Logger = LoggerFactory.getLogger(SparkService.class.getCanonicalName()); | |
protected final static AtomicBoolean isRunning = new AtomicBoolean(false); | |
/* ex: 192.168.137.100 */ | |
private final static String CASSANDRA_HOST = Configuration.root().getString("spark.cassandra.connection.host"); | |
/* ex: spark://sparkMaster:7077 */ | |
private final static String SPARK_MASTER = Configuration.root().getString("spark.cassandra.connection.master"); | |
private final static String KEYSPACE_NAME = Configuration.root().getString("spark.cassandra.keyspace");. | |
private static SparkConf SPARK_CONF = null; | |
private static JavaSparkContext SPARK_CONTEXT = null; | |
private static CassandraSQLContext CASSANDRA_SQL_CONTEXT = null; | |
/* Static initializer to setup spark context */ | |
public static void initialize() { | |
try | |
{ | |
SPARK_CONF = new SparkConf().setAppName("SparkOps").set("spark.cassandra.connection.host", CASSANDRA_HOST).setMaster(SPARK_MASTER); | |
SPARK_CONTEXT = new JavaSparkContext(SPARK_CONF); | |
CASSANDRA_SQL_CONTEXT = new CassandraSQLContext( JavaSparkContext.toSparkContext(SPARK_CONTEXT)) ; | |
CASSANDRA_SQL_CONTEXT.setKeyspace(KEYSPACE_NAME); | |
} catch (Exception e) | |
{ | |
Logger.error(e.getMessage(),e); | |
} | |
} | |
public static void start() { | |
if(!isRunning.get()) | |
{ | |
isRunning.set(true); | |
Logger.info(SparkService.class.getSimpleName() + " is starting..."); | |
initialize(); | |
Logger.info(SparkService.class.getSimpleName() + " started..."); | |
} | |
} | |
public static void stop() { | |
if (isRunning.get()) { | |
Logger.info(SparkService.class.getSimpleName() + " is stopping..."); | |
if (SPARK_CONTEXT != null) { | |
SPARK_CONTEXT.stop(); | |
} | |
isRunning.set(false); | |
Logger.info(SparkService.class.getSimpleName() + " is stopped."); | |
} | |
} | |
public static CassandraSQLContext getContext() { | |
if (CASSANDRA_SQL_CONTEXT == null) { | |
start(); | |
} | |
return CASSANDRA_SQL_CONTEXT; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment