Skip to content

Instantly share code, notes, and snippets.

@RussellSpitzer
Created August 26, 2015 19:15
Show Gist options
  • Save RussellSpitzer/b9281682429c3c0979d2 to your computer and use it in GitHub Desktop.
Save RussellSpitzer/b9281682429c3c0979d2 to your computer and use it in GitHub Desktop.
import com.datastax.bdp.spark.DseSparkConfHelper;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;
import java.util.List;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
public class SchemaRDDtoNormalRDD
{
public static void main(String[] args)
{
/**
* Insert some junk data with the java driver
*/
Cluster c = Cluster.builder().addContactPoint("127.0.0.1").build();
Session s = c.connect();
System.out.println("Creating some keyspaces");
s.execute("CREATE KEYSPACE IF NOT EXISTS ascii_ks WITH replication = {" +
" 'class': 'SimpleStrategy'," +
" 'replication_factor': '1'" +
"}");
s.execute("USE ascii_ks");
s.execute("DROP TABLE IF EXISTS ascii_cs");
s.execute("DROP TABLE IF EXISTS ascii_cs_copy");
s.execute(" CREATE TABLE IF NOT EXISTS ascii_cs " +
"(pkey ascii, ckey1 ascii, data1 ascii, PRIMARY KEY (pkey, ckey1))");
s.execute(" CREATE TABLE IF NOT EXISTS ascii_cs_copy " +
"(pkey ascii, ckey1 ascii, data1 ascii, PRIMARY KEY (pkey, ckey1))");
System.out.println("Inserting some garabage data");
PreparedStatement ps = s.prepare("INSERT INTO ascii_ks.ascii_cs (pkey,ckey1,data1) VALUES (?,?,?)");
for (int x = 0; x < 10; x++)
for (int y = 0; y < 1000; y++)
{
s.execute(ps.bind(Integer.toString(x), Integer.toString(y), Integer.toString(y)));
}
/**
* Create our spark context
*/
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf())
.set("spark.cassandra.auth.conf.factory", "com.datastax.bdp.spark.DseAuthConfFactory")
.setAppName("My application");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaHiveContext sqlContext = new JavaHiveContext(jsc);
/**
* Convert our SchemaRDD to Mbean Like Classes
*/
JavaSchemaRDD sRDD = sqlContext.sql("SELECT * from ascii_ks.ascii_cs");
JavaRDD<AsciiRow> asciiRDD = sRDD.map(new Function<Row, AsciiRow>()
{
@Override
public AsciiRow call(Row r)
{
return new AsciiRow(r.getString(0), r.getString(1), r.getString(2));
}
}
);
/**
* Now that we have an RDD[AsciiRow] we can use the AsciiRow mapper to save the data to C*
*/
javaFunctions(asciiRDD)
.writerBuilder("ascii_ks","ascii_cs_copy", mapToRow(AsciiRow.class))
.saveToCassandra();
/**
* Read what we copied
*/
JavaRDD<AsciiRow> copyRDD = javaFunctions(jsc)
.cassandraTable("ascii_ks", "ascii_cs_copy", mapRowTo(AsciiRow.class));
List<AsciiRow> rows = copyRDD.collect();
for (AsciiRow row : rows)
{
System.out.printf("Pkey %s, Ckey %s, Data %s\n", row.getPkey(), row.getCkey1(), row.getData1());
}
jsc.stop();
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment