Created
December 19, 2017 07:04
-
-
Save johntbush/e8e04b50c14f5c89ec4620e63003016c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.parquet.writing | |
import java.lang.Exception | |
import java.util | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.parquet.hadoop.ParquetWriter | |
import org.apache.parquet.hadoop.metadata.CompressionCodecName | |
import org.apache.hadoop.fs.Path | |
import java.util.{Date, UUID} | |
import scala.collection.JavaConversions._ | |
import com.datastax.driver.core.{ColumnDefinitions, DataType} | |
import com.trax.platform.shared.Cassandra | |
import com.typesafe.scalalogging.LazyLogging | |
import org.apache.parquet.column.ParquetProperties.WriterVersion | |
import org.apache.parquet.hadoop.api.WriteSupport | |
import org.apache.parquet.io.ParquetEncodingException | |
import org.apache.parquet.io.api.RecordConsumer | |
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName | |
import org.apache.parquet.schema.{MessageType} | |
import scala.util.Try | |
/** | |
* | |
create external table qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd ( | |
appname string, | |
email string, | |
groupname string, | |
grouporgid string, | |
grouporgname string, | |
homeorgid string, | |
homeorgname string, | |
id int, | |
isadmin int, | |
isgrouphomeorg int, | |
istraxuser int, | |
legacy_id string, | |
name string | |
) | |
STORED AS PARQUET | |
LOCATION 's3://trax-spark-dev/qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd' | |
tblproperties ("parquet.compress"="GZIP"); | |
*/ | |
object Main extends LazyLogging { | |
private val NumberOfRecords = 100 * 1000 | |
private val rnd = new scala.util.Random | |
def createS3ParquetWriter(path: String, accessKey: String, secretKey: String, table:String, row: ColumnDefinitions, compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = { | |
import org.apache.parquet.schema.MessageTypeParser | |
val schemaString = s"message $table {\n " + row.map { row => | |
row.getType.getName match { | |
case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII => s"OPTIONAL binary ${row.getName} (UTF8)" | |
case DataType.Name.DECIMAL => s"OPTIONAL DOUBLE ${row.getName}" | |
case DataType.Name.BOOLEAN => s"OPTIONAL BOOLEAN ${row.getName}" | |
case DataType.Name.INT => s"OPTIONAL int32 ${row.getName}" | |
case DataType.Name.FLOAT => s"OPTIONAL FLOAT ${row.getName}" | |
case DataType.Name.DOUBLE => s"OPTIONAL DOUBLE ${row.getName}" | |
case DataType.Name.BIGINT => s"OPTIONAL int64 ${row.getName}" | |
case DataType.Name.COUNTER => s"OPTIONAL int64 ${row.getName}" | |
case DataType.Name.TIMESTAMP => s"OPTIONAL int64 ${row.getName} (TIMESTAMP_MILLIS)" | |
case DataType.Name.BLOB => s"OPTIONAL binary ${row.getName}" | |
case DataType.Name.VARINT => s"OPTIONAL int64 ${row.getName}" | |
case DataType.Name.UUID | DataType.Name.TIMEUUID => s"OPTIONAL binary ${row.getName} (UTF8)" | |
case DataType.Name.DATE => s"OPTIONAL int64 ${row.getName} (DATE)" | |
case _ => "Unsupported parquet column type: " + row.getType.getName | |
} | |
}.mkString(";\n") + ";\n}" | |
System.out.println(schemaString) | |
//val schemaString = new StringBuilder("message sample { required binary city; }") | |
val schema = MessageTypeParser.parseMessageType(schemaString) | |
val writeSupport = new CassandraWriteSupport(schema) | |
val outputPath = new Path(path) | |
val conf = new Configuration | |
conf.set("fs.s3a.access.key", accessKey) | |
conf.set("fs.s3a.secret.key", secretKey) | |
new ParquetWriter[Map[String, Any]](outputPath, | |
writeSupport, compressionCodecName, blockSize, pageSize, pageSize, | |
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, | |
ParquetWriter.DEFAULT_WRITER_VERSION, | |
conf | |
) | |
} | |
class CassandraWriteSupport(schema: MessageType) extends WriteSupport[Map[String, Any]] { | |
import PrimitiveTypeName._ | |
var recordConsumer: RecordConsumer = null | |
override def init(configuration: Configuration): WriteSupport.WriteContext = { | |
new WriteSupport.WriteContext(schema, new util.HashMap[String, String]()) | |
} | |
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { | |
this.recordConsumer = recordConsumer | |
} | |
import org.apache.parquet.io.api.Binary | |
def stringToBinary(value: Any) = Binary.fromReusedByteArray(value.toString.getBytes) | |
override def write(record: Map[String, Any]): Unit = { | |
recordConsumer.startMessage | |
var i = 0 | |
schema.getColumns.foreach { desc => | |
val theType = desc.getType | |
record.get(desc.getPath()(0)).foreach { v => | |
try { | |
if (v != null) { | |
recordConsumer.startField(desc.getPath()(0), i) | |
theType match { | |
case BOOLEAN => recordConsumer.addBoolean(v.asInstanceOf[Boolean]) | |
case BINARY => recordConsumer.addBinary(stringToBinary(v)) | |
case INT32 => recordConsumer.addInteger(v.asInstanceOf[Int]) | |
case DOUBLE => | |
v match { | |
case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue()) | |
case x: Double => recordConsumer.addDouble(v.asInstanceOf[Double]) | |
} | |
case FLOAT => recordConsumer.addFloat(v.asInstanceOf[Float]) | |
case INT64 => | |
v match { | |
case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue()) | |
case x: Date => recordConsumer.addLong(v.asInstanceOf[Date].getTime) | |
case x: Long => recordConsumer.addLong(v.asInstanceOf[Long]) | |
} | |
case _ => throw new ParquetEncodingException( | |
"Unsupported column type: " + v.getClass.getName); | |
} | |
recordConsumer.endField(desc.getPath()(0), i) | |
} | |
} catch { | |
case e: Exception => | |
val value = Option(v).getOrElse("null") | |
System.out.println(s"failed on col:${desc.getPath()(0)} type:$theType value:${value.toString}: " + e.getMessage) | |
recordConsumer.endField(desc.getPath()(0), i) | |
throw e | |
} | |
} | |
i = i + 1 | |
} | |
recordConsumer.endMessage | |
} | |
} | |
private def writeToS3(path: String, accessKey: String, secretKey: String, casUser:String, casPwd:String, | |
seeds:Seq[String],keyspace:String, table:String, | |
compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = { | |
System.out.println("starting...") | |
val session = Cassandra.session(keyspace, seeds, None, Option(casUser), Option(casPwd)) | |
val stmt = s"select * from $table" | |
System.out.println("connected...") | |
var parquetWriter: ParquetWriter[Map[String, Any]] = null | |
var i = 0 | |
val rs = session.execute(stmt) | |
rs.iterator().foreach { row => | |
if (i == 0) { | |
parquetWriter = createS3ParquetWriter(path, accessKey, secretKey, table, row.getColumnDefinitions, | |
compressionCodecName, blockSize, pageSize) | |
} | |
i = i + 1 | |
val data: Map[String, Any] = | |
row.getColumnDefinitions | |
.filter(cdef => !row.isNull(cdef.getName)) | |
.map { cdef => | |
val colName = cdef.getName | |
cdef.getType.getName match { | |
case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII => | |
colName -> row.getString(cdef.getName) | |
case DataType.Name.DECIMAL => | |
colName -> row.getDecimal(cdef.getName) | |
case DataType.Name.BOOLEAN => | |
colName -> row.getBool(cdef.getName) | |
case DataType.Name.INT => | |
colName -> row.getInt(cdef.getName) | |
case DataType.Name.FLOAT => | |
colName -> row.getFloat(cdef.getName) | |
case DataType.Name.DOUBLE => | |
colName -> row.getDouble(cdef.getName) | |
case DataType.Name.BIGINT => | |
colName -> row.getLong(cdef.getName) | |
case DataType.Name.COUNTER => | |
colName -> row.getLong(cdef.getName) | |
case DataType.Name.TIMESTAMP => | |
colName -> row.getTimestamp(cdef.getName) | |
case DataType.Name.BLOB => | |
colName -> row.getString(cdef.getName) | |
case DataType.Name.VARINT => | |
colName -> row.getVarint(cdef.getName) | |
case DataType.Name.UUID | DataType.Name.TIMEUUID => | |
colName -> Option(row.getUUID(cdef.getName)).fold(null: String)(_.toString) | |
case DataType.Name.DATE => colName -> | |
Try(new Date(row.getDate(cdef.getName).getMillisSinceEpoch)).toOption.orNull | |
case _ => colName -> | |
row.getString(cdef.getName) | |
} | |
}.toMap | |
parquetWriter.write(data) | |
if (1 % 1000 == 0) System.out.println(s"wrote ${i+1} rows") | |
} | |
if (parquetWriter != null) parquetWriter.close() | |
} | |
def main(args: Array[String]) { | |
val accessKey = "ABC" | |
val secretKey = "*" | |
//val table = "qv_efafa77b_dd46_5bf9_87f0_240b715f0797" | |
val qv_id = "2deadfea-4367-5ba9-9499-a61a044d10a5" | |
val table = "qv_" + qv_id.replace("-","_") | |
val s3Path = s"s3a://trax-spark-dev/test/$qv_id" | |
val keyspace = "quickviews" | |
val seeds = Seq("172.24.0.7", "172.24.0.8", "172.24.0.9") | |
val casUser = "johnbush" | |
val casPwd = "*" | |
val compressionCodecName = CompressionCodecName.SNAPPY | |
//val compressionCodecName = CompressionCodecName.GZIP | |
val blockSize = 256 * 1024 * 1024 | |
val pageSize = 1 * 1024 * 1024 | |
val start = new Date( ).getTime | |
try { | |
if (s3Path != null) { | |
writeToS3(s3Path, accessKey, secretKey, casUser, casPwd, seeds, keyspace, table, compressionCodecName, blockSize, pageSize) | |
} else { | |
println("S3_PATH is empty") | |
} | |
} finally { | |
Cassandra.close() | |
} | |
val end = new Date().getTime | |
println(s"took ${end - start} ms to load table:$qv_id to $s3Path") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment