Created
August 26, 2015 19:15
-
-
Save RussellSpitzer/b9281682429c3c0979d2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.bdp.spark.DseSparkConfHelper; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.PreparedStatement; | |
import com.datastax.driver.core.Session; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.sql.api.java.JavaSchemaRDD; | |
import org.apache.spark.sql.api.java.Row; | |
import org.apache.spark.sql.hive.api.java.JavaHiveContext; | |
import java.util.List; | |
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; | |
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; | |
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; | |
public class SchemaRDDtoNormalRDD | |
{ | |
public static void main(String[] args) | |
{ | |
/** | |
* Insert some junk data with the java driver | |
*/ | |
Cluster c = Cluster.builder().addContactPoint("127.0.0.1").build(); | |
Session s = c.connect(); | |
System.out.println("Creating some keyspaces"); | |
s.execute("CREATE KEYSPACE IF NOT EXISTS ascii_ks WITH replication = {" + | |
" 'class': 'SimpleStrategy'," + | |
" 'replication_factor': '1'" + | |
"}"); | |
s.execute("USE ascii_ks"); | |
s.execute("DROP TABLE IF EXISTS ascii_cs"); | |
s.execute("DROP TABLE IF EXISTS ascii_cs_copy"); | |
s.execute(" CREATE TABLE IF NOT EXISTS ascii_cs " + | |
"(pkey ascii, ckey1 ascii, data1 ascii, PRIMARY KEY (pkey, ckey1))"); | |
s.execute(" CREATE TABLE IF NOT EXISTS ascii_cs_copy " + | |
"(pkey ascii, ckey1 ascii, data1 ascii, PRIMARY KEY (pkey, ckey1))"); | |
System.out.println("Inserting some garabage data"); | |
PreparedStatement ps = s.prepare("INSERT INTO ascii_ks.ascii_cs (pkey,ckey1,data1) VALUES (?,?,?)"); | |
for (int x = 0; x < 10; x++) | |
for (int y = 0; y < 1000; y++) | |
{ | |
s.execute(ps.bind(Integer.toString(x), Integer.toString(y), Integer.toString(y))); | |
} | |
/** | |
* Create our spark context | |
*/ | |
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()) | |
.set("spark.cassandra.auth.conf.factory", "com.datastax.bdp.spark.DseAuthConfFactory") | |
.setAppName("My application"); | |
JavaSparkContext jsc = new JavaSparkContext(conf); | |
JavaHiveContext sqlContext = new JavaHiveContext(jsc); | |
/** | |
* Convert our SchemaRDD to Mbean Like Classes | |
*/ | |
JavaSchemaRDD sRDD = sqlContext.sql("SELECT * from ascii_ks.ascii_cs"); | |
JavaRDD<AsciiRow> asciiRDD = sRDD.map(new Function<Row, AsciiRow>() | |
{ | |
@Override | |
public AsciiRow call(Row r) | |
{ | |
return new AsciiRow(r.getString(0), r.getString(1), r.getString(2)); | |
} | |
} | |
); | |
/** | |
* Now that we have an RDD[AsciiRow] we can use the AsciiRow mapper to save the data to C* | |
*/ | |
javaFunctions(asciiRDD) | |
.writerBuilder("ascii_ks","ascii_cs_copy", mapToRow(AsciiRow.class)) | |
.saveToCassandra(); | |
/** | |
* Read what we copied | |
*/ | |
JavaRDD<AsciiRow> copyRDD = javaFunctions(jsc) | |
.cassandraTable("ascii_ks", "ascii_cs_copy", mapRowTo(AsciiRow.class)); | |
List<AsciiRow> rows = copyRDD.collect(); | |
for (AsciiRow row : rows) | |
{ | |
System.out.printf("Pkey %s, Ckey %s, Data %s\n", row.getPkey(), row.getCkey1(), row.getData1()); | |
} | |
jsc.stop(); | |
System.exit(0); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment