Created
March 20, 2016 18:49
-
-
Save suyash/8bb5d5ca947d094550a0 to your computer and use it in GitHub Desktop.
Spark SQL using static schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
group 'in.suyash.tests' | |
version '1.0-SNAPSHOT' | |
apply plugin: 'java' | |
apply plugin: 'application' | |
sourceCompatibility = 1.8 | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0' | |
compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '1.4.0' | |
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1' | |
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1' | |
testCompile group: 'junit', name: 'junit', version: '4.11' | |
} | |
sourceSets { | |
main { | |
java { | |
srcDir './' | |
} | |
} | |
} | |
mainClassName = 'Main' | |
// http://stackoverflow.com/a/14441628/3673043 | |
jar { | |
doFirst { | |
from { | |
configurations.compile.collect { | |
it.isDirectory() ? it : zipTree(it) | |
} | |
} | |
} | |
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA' | |
// https://docs.gradle.org/2.4/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64 | |
zip64 = true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Serializable; | |
import java.util.Arrays; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.SparkContext; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.sql.DataFrame; | |
import org.apache.spark.sql.SQLContext; | |
import com.datastax.spark.connector.japi.CassandraRow; | |
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; | |
public class Main { | |
public static class TableTuple implements Serializable { | |
private int id; | |
private String line; | |
TableTuple (int i, String l) { | |
id = i; | |
line = l; | |
} | |
public int getId () { | |
return id; | |
} | |
public void setId (int id) { | |
this.id = id; | |
} | |
public String getLine () { | |
return line; | |
} | |
public void setLine (String line) { | |
this.line = line; | |
} | |
@Override | |
public String toString() { | |
return id + ": " + line; | |
} | |
} | |
private static final String HOST = "spark://sparkmaster:7077"; | |
// private static final String HOST = "local[4]"; | |
private static final String APP_NAME = "Cassandra Spark WordCount"; | |
private static final String CASSANDRA_KEYSPACE = "wordcount"; | |
private static final String CASSANDRA_COLUMN_FAMILY = "input"; | |
public static void main (String... args) { | |
SparkConf conf = new SparkConf(true); | |
SparkContext sc = new SparkContext(HOST, APP_NAME, conf); | |
SQLContext sqlContext = new SQLContext(sc); | |
JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY); | |
JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1))); | |
DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class); | |
dataFrame.registerTempTable("lines"); | |
DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1"); | |
System.out.println(Arrays.asList(resultsFrame.collect())); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
all: | |
make build | |
make run | |
run: | |
spark-submit \ | |
--class Main \ | |
--conf "spark.driver.memory=512m" \ | |
--conf "spark.executor.memory=2409m" \ | |
--conf "spark.network.timeout=600s" \ | |
--conf "spark.cassandra.connection.host=107.108.214.154" \ | |
--conf "spark.cassandra.input.split.size_in_mb=67108864" \ | |
./build/libs/CassandraSparkSQL-1.0-SNAPSHOT.jar | |
build: clean | |
gradle jar | |
clean: | |
rm -rf build | |
.PHONY: all clean |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rootProject.name = 'CassandraSparkSQL' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment