Skip to content

Instantly share code, notes, and snippets.

@RussellSpitzer
Created January 31, 2015 21:22
Show Gist options
  • Save RussellSpitzer/e89f3fde61e38ae63a31 to your computer and use it in GitHub Desktop.
Save RussellSpitzer/e89f3fde61e38ae63a31 to your computer and use it in GitHub Desktop.
package com.datastax.spark.connector
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.rdd.{CassandraPartitionKeyRDD, CassandraRDD, ValidRDDType, SpannedRDD}
import com.datastax.spark.connector.writer._
import com.datastax.spark.connector.rdd.reader._
import com.datastax.spark.connector._
import scala.reflect.ClassTag
/** Provides Cassandra-specific methods on `RDD` */
class RDDFunctions[T](rdd: RDD[T]) extends WritableToCassandra[T] with Serializable {
override val sparkContext: SparkContext = rdd.sparkContext
/**
* Saves the data from `RDD` to a Cassandra table. Uses the specified column names.
* @see [[com.datastax.spark.connector.writer.WritableToCassandra]]
*/
def saveToCassandra(keyspaceName: String,
tableName: String,
columns: ColumnSelector = AllColumns,
writeConf: WriteConf = WriteConf.fromSparkConf(sparkContext.getConf))
(implicit connector: CassandraConnector = CassandraConnector(sparkContext.getConf),
rwf: RowWriterFactory[T]): Unit = {
val writer = TableWriter(connector, keyspaceName, tableName, columns, writeConf)
rdd.sparkContext.runJob(rdd, writer.write _)
}
/** Applies a function to each item, and groups consecutive items having the same value together.
* Contrary to `groupBy`, items from the same group must be already next to each other in the
* original collection. Works locally on each partition, so items from different
* partitions will never be placed in the same group. */
def spanBy[U](f: (T) => U): RDD[(U, Iterable[T])] =
new SpannedRDD[U, T](rdd, f)
def fetchFromCassandra[R](keyspaceName: String, tableName: String, repartition: Boolean = true)
(implicit connector: CassandraConnector = CassandraConnector(sparkContext.getConf),
newType: ClassTag[R], rrf: RowReaderFactory[R], ev: ValidRDDType[R],
currentType: ClassTag[T], rwf: RowWriterFactory[T]): CassandraRDD[R] = {
val cassRdd = new CassandraPartitionKeyRDD[T, R](rdd, keyspaceName, tableName, connector)
if (repartition) {
// Todo See if we can determine whether or not we should repartition (prev.class == CassandraRDD and T matches keys of Keyspace,Table)
cassRdd.partitionByReplica()
} else {
cassRdd
}
}
}
@fashionliner
Copy link

can you help me with my device please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment