Created
March 11, 2019 21:57
-
-
Save PeterCorless/d2ca07f9b4fe5a70afb99ab2f3a34ae2 to your computer and use it in GitHub Desktop.
Scylla Migrator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val connector = | |
| new CassandraConnector(CassandraConnectorConf(sparkContext.getConf)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.datastax.spark.connector.cql.TableDef | |
| case class TableDef( | |
| keyspaceName: String, | |
| tableName: String, | |
| partitionKey: Seq[ColumnDef], | |
| clusteringColumns: Seq[ColumnDef], | |
| regularColumns: Seq[ColumnDef], | |
| indexes: Seq[IndexDef] = Seq.empty, | |
| isView: Boolean = false | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.datastax.spark.connector.cql.Schema | |
| val tableDef: TableDef = Schema.tableFromCassandra(connector, "keyspace", "table") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| case class StructType(fields: Array[StructField]) | |
| case class StructField( | |
| name: String, | |
| dataType: DataType, | |
| nullable: Boolean = true, | |
| metadata: Metadata = Metadata.empty) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def toStructField(column: ColumnDef): StructField = | |
| StructField(column.columnName, catalystDataType(column.columnType, nullable = true)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val schema = StructType(tableDef.columns.map(DataTypeConverter.toStructField)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def select(columns: ColumnRef*): Self |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val refs: Seq[ColumnRef] = tableDef.allColumns.map(_.ref) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.datastax.spark.connector.{TTL, WriteTime} | |
| val projection: Seq[ColumnRef] = | |
| tableDef.allColumns.flatMap { columnDef => | |
| val colName = columnDef.columnName | |
| List( | |
| columnDef, | |
| TTL(colName).as(s"${colName}_ttl"), | |
| WriteTime(colName).as(s"${colName}_writetime") | |
| ) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val projection: Seq[ColumnRef] = | |
| tableDef.partitionKey.map(_.ref) ++ | |
| tableDef.clusteringColumns.map(_.ref) ++ | |
| tableDef.regularColumns.flatMap { columnDef => | |
| val colName = columnDef.columnName | |
| List( | |
| columnDef, | |
| TTL(colName).as(s"${colName}_ttl"), | |
| WriteTime(colName).as(s"${colName}_writetime") | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val rdd: CassandraRDD[CassandraSQLRow] | |
| spark.sparkContext | |
| .cassandraTable[CassandraSQLRow](source.keyspace, source.table) | |
| .select(projection: _*) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val modifiedSchema = | |
| StructType( | |
| for { | |
| origField <- schema.fields | |
| isRegular = tableDef.regularColumns.exists(_.ref.columnName == origField.name) | |
| field <- if (isRegular) | |
| List( | |
| origField, | |
| StructField(s"${origField.name}_ttl", LongType, true), | |
| StructField(s"${origField.name}_writetime", LongType, true)) | |
| else List(origField) | |
| } yield field | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val dataframe = | |
| spark.createDataset(rdd)(RowEncoder(modifiedSchema)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| INSERT INTO table (key1, key2, regular1, regular2) | |
| VALUES ("a", 1, "reg1", "reg2") | |
| USING TTL 86400; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| - key1: "a" | |
| - key2: "b" | |
| - regular1: "reg1", TTL: 10, WRITETIME: 1000 | |
| - regular2: "reg2", TTL: 10, WRITETIME: 2000 | |
| - regular3: "reg3", TTL: 20, WRITETIME: 3000 | |
| - regular4: "reg4", TTL: 20, WRITETIME: 3000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| INSERT INTO table (key1, key2, regular1) | |
| VALUES ("a", 1, "reg1") | |
| USING TTL 10 AND TIMESTAMP 1000; | |
| INSERT INTO table (key1, key2, regular2) | |
| VALUES ("a", 1, "reg2") | |
| USING TTL 10 AND TIMESTAMP 2000; | |
| INSERT INTO table (key1, key2, regular3, regular4) | |
| VALUES ("a", 1, "reg3", "reg4") | |
| USING TTL 20 AND TIMESTAMP 3000; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| sealed trait CassandraOption[+A] extends Product with Serializable | |
| object CassandraOption { | |
| case class Value[+A](value: A) extends CassandraOption[A] | |
| case object Unset extends CassandraOption[Nothing] | |
| case object Null extends CassandraOption[Nothing] | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def indexFields(fieldNames: List[String], | |
| tableDef: TableDef) = { | |
| val fieldIndices = fieldNames.zipWithIndex.toMap | |
| val primaryKeyIndices: Map[String, Int] = | |
| (for { | |
| fieldName <- fieldNames | |
| if tableDef.primaryKey.exists(_.ref.columnName == fieldName) | |
| index <- fieldIndices.get(fieldName) | |
| } yield origFieldName -> index).toMap | |
| val regularKeyIndices: Map[String, (Int, Int, Int)] = | |
| (for { | |
| fieldName <- fieldNames | |
| if tableDef.regularColumns.exists(_.ref.columnName == fieldName) | |
| fieldIndex <- fieldIndices.get(fieldName) | |
| ttlIndex <- fieldIndices.get(s"${fieldName}_ttl") | |
| writetimeIndex <- fieldIndices.get(s"${fieldName}_writetime") | |
| } yield fieldName -> (fieldIndex, ttlIndex, writetimeIndex)).toMap | |
| (primaryKeyIndices, regularKeyIndices) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def explodeRow(row: Row, | |
| schema: StructType, | |
| primaryKeyIndices: Map[String, Int], | |
| regularKeyIndices: Map[String, (Int, Int, Int)]) = | |
| if (regularKeyIndices.isEmpty) List(row) | |
| else | |
| regularKeyIndices | |
| .map { | |
| case (fieldName, (ordinal, ttlOrdinal, writetimeOrdinal)) => | |
| (fieldName, | |
| if (row.isNullAt(ordinal)) CassandraOption.Null | |
| else CassandraOption.Value(row.get(ordinal)), | |
| if (row.isNullAt(ttlOrdinal)) None | |
| else Some(row.getLong(ttlOrdinal)), | |
| row.getLong(writetimeOrdinal)) | |
| } | |
| .groupBy { | |
| case (_, _, ttl, writetime) => (ttl, writetime) | |
| } | |
| .mapValues { fieldGroups => | |
| fieldGroups | |
| .map { | |
| case (fieldName, value, _, _) => fieldName -> value | |
| } | |
| .toMap | |
| } | |
| .map { | |
| case ((ttl, writetime), fields) => | |
| val newValues = schema.fields.map { field => | |
| primaryKeyIndices | |
| .get(field.name) | |
| .flatMap { ord => | |
| if (row.isNullAt(ord)) None | |
| else Some(row.get(ord)) | |
| } | |
| .getOrElse(fields.getOrElse(field.name, CassandraOption.Unset)) | |
| } ++ Seq(ttl.getOrElse(0L), writetime) | |
| Row(newValues: _*) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val (primaryKeyOrdinals, regularKeyOrdinals) = indexFields( | |
| df.schema.fields.map(_.name).toList, | |
| tableDef) | |
| val broadcastPrimaryKeyOrdinals = spark.sparkContext.broadcast(primaryKeyOrdinals) | |
| val broadcastRegularKeyOrdinals = spark.sparkContext.broadcast(regularKeyOrdinals) | |
| val broadcastSchema = spark.sparkContext.broadcast(origSchema) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| df.flatMap { | |
| explodeRow( | |
| _, | |
| broadcastSchema.value, | |
| broadcastPrimaryKeyOrdinals.value, | |
| broadcastRegularKeyOrdinals.value) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val colSelector: ColumnSelector = | |
| SomeColumns(origSchema.fields.map(x => x.name: ColumnRef)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val writeConf = | |
| WriteConf.fromSparkConf(spark.sparkContext.getConf) | |
| .copy( | |
| ttl = TTLOption.perRow("ttl"), | |
| timestamp = TimestampOption.perRow("writetime") | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| df.rdd.saveToCassandra( | |
| keyspaceName, | |
| tableName, | |
| colSelector, | |
| writeConf | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| abstract class AccumulatorV2[IN, OUT] { | |
| def isZero: Boolean | |
| def copy(): AccumulatorV2[IN, OUT] | |
| def reset(): Unit | |
| def add(v: IN): Unit | |
| def merge(other: AccumulatorV2[IN, OUT]): Unit | |
| def value: OUT | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.atomic.AtomicReference | |
| import java.util.function.UnaryOperator | |
| import com.datastax.spark.connector.rdd.partitioner.CqlTokenRange | |
| import org.apache.spark.util.AccumulatorV2 | |
| class TokenRangeAccumulator(acc: AtomicReference[Set[CqlTokenRange[_, _]]]) | |
| extends AccumulatorV2[Set[CqlTokenRange[_, _]], Set[CqlTokenRange[_, _]]] { | |
| override def add(v: Set[CqlTokenRange[_, _]]): Unit = | |
| acc.getAndUpdate( | |
| new UnaryOperator[Set[CqlTokenRange[_, _]]] { | |
| override def apply(t: Set[CqlTokenRange[_, _]]): Set[CqlTokenRange[_, _]] = | |
| t ++ v | |
| } | |
| ) | |
| override def value: AtomicReference[Set[CqlTokenRange[_, _]]] = acc.get() | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def extractTokenRange(partitionId: Int): Iterable[CqlTokenRange[_, _]] = | |
| partitions.lift(partitionId) match { | |
| case Some(CassandraPartition(_, _, ranges, _)) => ranges | |
| case _ => List() | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| tokenRangeAcc.foreach(_.add(tokenRanges.toSet)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def startSavepointSchedule(svc: ScheduledThreadPoolExecutor, | |
| config: MigratorConfig, | |
| acc: TokenRangeAccumulator): Unit = { | |
| val runnable = new Runnable { | |
| override def run(): Unit = | |
| try dumpAccumulatorState(config, acc, "schedule") | |
| catch { | |
| case e: Throwable => | |
| log.error("Could not create the savepoint. This will be retried.", e) | |
| } | |
| } | |
| log.info( | |
| s"Starting savepoint schedule; will write a savepoint every ${config.savepoints.intervalSeconds} seconds") | |
| svc.scheduleAtFixedRate(runnable, 0, config.savepoints.intervalSeconds, TimeUnit.SECONDS) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val tokenRanges = | |
| partition.tokenRanges.filter { cqlRange => | |
| val (start, end) = (cqlRange.range.start.value, cqlRange.range.end.value) match { | |
| case (s: Long, e: Long) => (s, e) | |
| case _ => | |
| throw new Exception("Encountered TokenRanges that use tokens of a type that isn't Long." + | |
| "This probably means that the server is using a Random partitioner which is currently" + | |
| s"unsupported. Range: ${cqlRange.range}") | |
| } | |
| tokenRangeFilter(start, end) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment