Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:49
Show Gist options
  • Save suyash/8bb5d5ca947d094550a0 to your computer and use it in GitHub Desktop.
Save suyash/8bb5d5ca947d094550a0 to your computer and use it in GitHub Desktop.
Spark SQL using static schema
group 'in.suyash.tests'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0'
compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '1.4.0'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = 'Main'
// http://stackoverflow.com/a/14441628/3673043
jar {
doFirst {
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA'
// https://docs.gradle.org/2.4/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64
zip64 = true
}
import java.io.Serializable;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import com.datastax.spark.connector.japi.CassandraRow;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
public class Main {
public static class TableTuple implements Serializable {
private int id;
private String line;
TableTuple (int i, String l) {
id = i;
line = l;
}
public int getId () {
return id;
}
public void setId (int id) {
this.id = id;
}
public String getLine () {
return line;
}
public void setLine (String line) {
this.line = line;
}
@Override
public String toString() {
return id + ": " + line;
}
}
private static final String HOST = "spark://sparkmaster:7077";
// private static final String HOST = "local[4]";
private static final String APP_NAME = "Cassandra Spark WordCount";
private static final String CASSANDRA_KEYSPACE = "wordcount";
private static final String CASSANDRA_COLUMN_FAMILY = "input";
public static void main (String... args) {
SparkConf conf = new SparkConf(true);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1)));
DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class);
dataFrame.registerTempTable("lines");
DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1");
System.out.println(Arrays.asList(resultsFrame.collect()));
}
}
all:
make build
make run
run:
spark-submit \
--class Main \
--conf "spark.driver.memory=512m" \
--conf "spark.executor.memory=2409m" \
--conf "spark.network.timeout=600s" \
--conf "spark.cassandra.connection.host=107.108.214.154" \
--conf "spark.cassandra.input.split.size_in_mb=67108864" \
./build/libs/CassandraSparkSQL-1.0-SNAPSHOT.jar
build: clean
gradle jar
clean:
rm -rf build
.PHONY: all clean
rootProject.name = 'CassandraSparkSQL'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment