Created
September 15, 2015 22:17
-
-
Save RussellSpitzer/40d6aa6c1f0ff29c033d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.datastax.spark.connector.rdd | |
import com.datastax.driver.core.Session | |
import com.datastax.spark.connector.{PartitionKeyColumns, AllColumns, ColumnSelector} | |
import com.datastax.spark.connector.cql.CassandraConnector | |
import org.apache.spark.rdd.RDD | |
import com.datastax.spark.connector.writer.{BoundStatementBuilder, RowWriter} | |
import com.datastax.spark.connector.rdd.reader.{PrefetchingResultSetIterator, RowReader} | |
/** | |
* This most likely will have issues with generating a RowReader for type Option[R] | |
*/ | |
class CassandraLeftJoinRDD[L,R]( left: RDD[L], | |
keyspaceName: String, | |
tableName: String, | |
connector: CassandraConnector, | |
columnNames: ColumnSelector = AllColumns, | |
joinColumns: ColumnSelector = PartitionKeyColumns, | |
where: CqlWhereClause = CqlWhereClause.empty, | |
limit: Option[Long] = None, | |
clusteringOrder: Option[ClusteringOrder] = None, | |
readConf: ReadConf = ReadConf(), | |
manualRowReader: Option[RowReader[Option[R]]] = None, // <<--- This is terrible we need to fix this in the grand refactor we have planned | |
manualRowWriter: Option[RowWriter[L]] = None) extends CassandraJoinRDD[L,Option[R]]( | |
left, | |
keyspaceName, | |
tableName, | |
connector, | |
columnNames, | |
joinColumns, | |
where, | |
limit, | |
clusteringOrder, | |
readConf, | |
manualRowReader, | |
manualRowWriter | |
) { | |
override | |
protected def fetchIterator( | |
session: Session, | |
bsb: BoundStatementBuilder[L], | |
lastIt: Iterator[L]): Iterator[(L, R)] = { | |
val columnNamesArray = selectedColumnRefs.map(_.selectedAs).toArray | |
implicit val pv = protocolVersion(session) | |
for (leftSide <- lastIt; | |
rightSide <- { | |
val rs = session.execute(bsb.bind(leftSide)) | |
val iterator = new PrefetchingResultSetIterator(rs, fetchSize) | |
if ( iterator.isEmpty ) Seq(None) | |
else iterator.map( r => Some( rowReader.read(r, columnNamesArray) ) ) | |
}) yield (leftSide, rightSide) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment