Clone, Import as a gradle project called CassandraSparkMapReduce and
make| group 'in.suyash.tests' | |
| version '1.0-SNAPSHOT' | |
| apply plugin: 'java' | |
| apply plugin: 'application' | |
| sourceCompatibility = 1.8 | |
| repositories { | |
| mavenCentral() | |
| } | |
| dependencies { | |
| compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0' | |
| compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1' | |
| compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1' | |
| testCompile group: 'junit', name: 'junit', version: '4.11' | |
| } | |
| sourceSets { | |
| main { | |
| java { | |
| srcDir './' | |
| } | |
| } | |
| } | |
| mainClassName = 'Main' | |
| // http://stackoverflow.com/a/14441628/3673043 | |
| jar { | |
| doFirst { | |
| from { | |
| configurations.compile.collect { | |
| it.isDirectory() ? it : zipTree(it) | |
| } | |
| } | |
| } | |
| exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA' | |
| } |
| import java.util.Arrays; | |
| import java.util.Map; | |
| import static com.datastax.spark.connector.japi.CassandraJavaUtil.*; | |
| import com.datastax.spark.connector.japi.CassandraRow; | |
| import com.datastax.spark.connector.japi.SparkContextJavaFunctions; | |
| import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; | |
| import org.apache.spark.SparkConf; | |
| import org.apache.spark.SparkContext; | |
| import org.apache.spark.api.java.JavaRDD; | |
| public class Main { | |
| private static final String HOST = "spark://sparkmaster:7077"; | |
| // private static final String HOST = "local[4]"; | |
| private static final String APP_NAME = "Cassandra Spark WordCount"; | |
| private static final String CASSANDRA_KEYSPACE = "wordcount"; | |
| private static final String CASSANDRA_COLUMN_FAMILY = "input"; | |
| public static void main (String... args) { | |
| SparkConf conf = new SparkConf(true); | |
| SparkContext sc = new SparkContext(HOST, APP_NAME, conf); | |
| SparkContextJavaFunctions context = javaFunctions(sc); | |
| CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); | |
| JavaRDD<String> words = rdd.flatMap(row -> Arrays.asList(row.getString("line").split(" "))); | |
| // JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1)); | |
| // Map<String, Integer> counts = pairs.reduceByKey((c1, c2) -> c1 + c2).collectAsMap(); | |
| Map<String, Long> counts = words.countByValue(); | |
| System.out.println(counts); | |
| } | |
| } |
| all: | |
| make build | |
| make run | |
| run: | |
| spark-submit \ | |
| --class Main \ | |
| --conf "spark.driver.memory=512m" \ | |
| --conf "spark.executor.memory=2409m" \ | |
| --conf "spark.network.timeout=600s" \ | |
| --conf "spark.cassandra.connection.host=107.108.214.154" \ | |
| --conf "spark.cassandra.input.split.size_in_mb=67108864" \ | |
| ./build/libs/CassandraSparkMapReduce-1.0-SNAPSHOT.jar | |
| build: clean | |
| gradle jar | |
| clean: | |
| rm -rf build | |
| .PHONY: all clean |
| rootProject.name = 'CassandraSparkMapReduce' | |