Skip to content

Instantly share code, notes, and snippets.

@mstump
Last active August 29, 2015 14:24
Show Gist options
  • Select an option

  • Save mstump/72dda0a23dcd1dc1eedc to your computer and use it in GitHub Desktop.

Select an option

Save mstump/72dda0a23dcd1dc1eedc to your computer and use it in GitHub Desktop.
def getMultiSegmentRdd(
sc: SparkContext,
sqlContext: CassandraSQLContext,
keyspace: String,
table: String,
tenantId: Int,
segments: Array[String],
columns: Array[String] = Array()) :
SchemaRDD = {
val source = sc.parallelize(segments).map(Tuple2(tennantId, _))
val repart = source.repartitionByCassandraReplica(keyspace, table, sparkPartitions)
var joinRdd = repart.joinWithCassandraTable[CassandraSQLRow](keyspace, table)
if (segments.length != 0) {
joinRdd = joinRdd.select(columns.map{c => new ColumnName(c)}:_*)
}
val selectedColumnNames = joinRdd.selectedColumnNames
val tableDef = joinRdd.tableDef.columnByName
val schema = StructType(
selectedColumnNames.map {
name => StructField(
name,
ColumnDataType.catalystDataType(tableDef(name).columnType, true),
true)
}
)
return sqlContext.applySchema(joinRdd.map{x => x._2}, schema)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment