Skip to content

Instantly share code, notes, and snippets.

@johntbush
Created December 19, 2017 07:04
Show Gist options
  • Save johntbush/e8e04b50c14f5c89ec4620e63003016c to your computer and use it in GitHub Desktop.
Save johntbush/e8e04b50c14f5c89ec4620e63003016c to your computer and use it in GitHub Desktop.
package com.example.parquet.writing
import java.lang.Exception
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.hadoop.fs.Path
import java.util.{Date, UUID}
import scala.collection.JavaConversions._
import com.datastax.driver.core.{ColumnDefinitions, DataType}
import com.trax.platform.shared.Cassandra
import com.typesafe.scalalogging.LazyLogging
import org.apache.parquet.column.ParquetProperties.WriterVersion
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.io.ParquetEncodingException
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.{MessageType}
import scala.util.Try
/**
*
create external table qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd (
appname string,
email string,
groupname string,
grouporgid string,
grouporgname string,
homeorgid string,
homeorgname string,
id int,
isadmin int,
isgrouphomeorg int,
istraxuser int,
legacy_id string,
name string
)
STORED AS PARQUET
LOCATION 's3://trax-spark-dev/qv_015a8671_a4b0_510f_b6d2_a8cb2f5b66fd'
tblproperties ("parquet.compress"="GZIP");
*/
object Main extends LazyLogging {
private val NumberOfRecords = 100 * 1000
private val rnd = new scala.util.Random
def createS3ParquetWriter(path: String, accessKey: String, secretKey: String, table:String, row: ColumnDefinitions, compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = {
import org.apache.parquet.schema.MessageTypeParser
val schemaString = s"message $table {\n " + row.map { row =>
row.getType.getName match {
case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII => s"OPTIONAL binary ${row.getName} (UTF8)"
case DataType.Name.DECIMAL => s"OPTIONAL DOUBLE ${row.getName}"
case DataType.Name.BOOLEAN => s"OPTIONAL BOOLEAN ${row.getName}"
case DataType.Name.INT => s"OPTIONAL int32 ${row.getName}"
case DataType.Name.FLOAT => s"OPTIONAL FLOAT ${row.getName}"
case DataType.Name.DOUBLE => s"OPTIONAL DOUBLE ${row.getName}"
case DataType.Name.BIGINT => s"OPTIONAL int64 ${row.getName}"
case DataType.Name.COUNTER => s"OPTIONAL int64 ${row.getName}"
case DataType.Name.TIMESTAMP => s"OPTIONAL int64 ${row.getName} (TIMESTAMP_MILLIS)"
case DataType.Name.BLOB => s"OPTIONAL binary ${row.getName}"
case DataType.Name.VARINT => s"OPTIONAL int64 ${row.getName}"
case DataType.Name.UUID | DataType.Name.TIMEUUID => s"OPTIONAL binary ${row.getName} (UTF8)"
case DataType.Name.DATE => s"OPTIONAL int64 ${row.getName} (DATE)"
case _ => "Unsupported parquet column type: " + row.getType.getName
}
}.mkString(";\n") + ";\n}"
System.out.println(schemaString)
//val schemaString = new StringBuilder("message sample { required binary city; }")
val schema = MessageTypeParser.parseMessageType(schemaString)
val writeSupport = new CassandraWriteSupport(schema)
val outputPath = new Path(path)
val conf = new Configuration
conf.set("fs.s3a.access.key", accessKey)
conf.set("fs.s3a.secret.key", secretKey)
new ParquetWriter[Map[String, Any]](outputPath,
writeSupport, compressionCodecName, blockSize, pageSize, pageSize,
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
ParquetWriter.DEFAULT_WRITER_VERSION,
conf
)
}
class CassandraWriteSupport(schema: MessageType) extends WriteSupport[Map[String, Any]] {
import PrimitiveTypeName._
var recordConsumer: RecordConsumer = null
override def init(configuration: Configuration): WriteSupport.WriteContext = {
new WriteSupport.WriteContext(schema, new util.HashMap[String, String]())
}
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
this.recordConsumer = recordConsumer
}
import org.apache.parquet.io.api.Binary
def stringToBinary(value: Any) = Binary.fromReusedByteArray(value.toString.getBytes)
override def write(record: Map[String, Any]): Unit = {
recordConsumer.startMessage
var i = 0
schema.getColumns.foreach { desc =>
val theType = desc.getType
record.get(desc.getPath()(0)).foreach { v =>
try {
if (v != null) {
recordConsumer.startField(desc.getPath()(0), i)
theType match {
case BOOLEAN => recordConsumer.addBoolean(v.asInstanceOf[Boolean])
case BINARY => recordConsumer.addBinary(stringToBinary(v))
case INT32 => recordConsumer.addInteger(v.asInstanceOf[Int])
case DOUBLE =>
v match {
case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue())
case x: Double => recordConsumer.addDouble(v.asInstanceOf[Double])
}
case FLOAT => recordConsumer.addFloat(v.asInstanceOf[Float])
case INT64 =>
v match {
case x: java.math.BigDecimal => recordConsumer.addDouble(v.asInstanceOf[java.math.BigDecimal].doubleValue())
case x: Date => recordConsumer.addLong(v.asInstanceOf[Date].getTime)
case x: Long => recordConsumer.addLong(v.asInstanceOf[Long])
}
case _ => throw new ParquetEncodingException(
"Unsupported column type: " + v.getClass.getName);
}
recordConsumer.endField(desc.getPath()(0), i)
}
} catch {
case e: Exception =>
val value = Option(v).getOrElse("null")
System.out.println(s"failed on col:${desc.getPath()(0)} type:$theType value:${value.toString}: " + e.getMessage)
recordConsumer.endField(desc.getPath()(0), i)
throw e
}
}
i = i + 1
}
recordConsumer.endMessage
}
}
private def writeToS3(path: String, accessKey: String, secretKey: String, casUser:String, casPwd:String,
seeds:Seq[String],keyspace:String, table:String,
compressionCodecName: CompressionCodecName, blockSize: Int, pageSize: Int) = {
System.out.println("starting...")
val session = Cassandra.session(keyspace, seeds, None, Option(casUser), Option(casPwd))
val stmt = s"select * from $table"
System.out.println("connected...")
var parquetWriter: ParquetWriter[Map[String, Any]] = null
var i = 0
val rs = session.execute(stmt)
rs.iterator().foreach { row =>
if (i == 0) {
parquetWriter = createS3ParquetWriter(path, accessKey, secretKey, table, row.getColumnDefinitions,
compressionCodecName, blockSize, pageSize)
}
i = i + 1
val data: Map[String, Any] =
row.getColumnDefinitions
.filter(cdef => !row.isNull(cdef.getName))
.map { cdef =>
val colName = cdef.getName
cdef.getType.getName match {
case DataType.Name.VARCHAR | DataType.Name.TEXT | DataType.Name.ASCII =>
colName -> row.getString(cdef.getName)
case DataType.Name.DECIMAL =>
colName -> row.getDecimal(cdef.getName)
case DataType.Name.BOOLEAN =>
colName -> row.getBool(cdef.getName)
case DataType.Name.INT =>
colName -> row.getInt(cdef.getName)
case DataType.Name.FLOAT =>
colName -> row.getFloat(cdef.getName)
case DataType.Name.DOUBLE =>
colName -> row.getDouble(cdef.getName)
case DataType.Name.BIGINT =>
colName -> row.getLong(cdef.getName)
case DataType.Name.COUNTER =>
colName -> row.getLong(cdef.getName)
case DataType.Name.TIMESTAMP =>
colName -> row.getTimestamp(cdef.getName)
case DataType.Name.BLOB =>
colName -> row.getString(cdef.getName)
case DataType.Name.VARINT =>
colName -> row.getVarint(cdef.getName)
case DataType.Name.UUID | DataType.Name.TIMEUUID =>
colName -> Option(row.getUUID(cdef.getName)).fold(null: String)(_.toString)
case DataType.Name.DATE => colName ->
Try(new Date(row.getDate(cdef.getName).getMillisSinceEpoch)).toOption.orNull
case _ => colName ->
row.getString(cdef.getName)
}
}.toMap
parquetWriter.write(data)
if (1 % 1000 == 0) System.out.println(s"wrote ${i+1} rows")
}
if (parquetWriter != null) parquetWriter.close()
}
def main(args: Array[String]) {
val accessKey = "ABC"
val secretKey = "*"
//val table = "qv_efafa77b_dd46_5bf9_87f0_240b715f0797"
val qv_id = "2deadfea-4367-5ba9-9499-a61a044d10a5"
val table = "qv_" + qv_id.replace("-","_")
val s3Path = s"s3a://trax-spark-dev/test/$qv_id"
val keyspace = "quickviews"
val seeds = Seq("172.24.0.7", "172.24.0.8", "172.24.0.9")
val casUser = "johnbush"
val casPwd = "*"
val compressionCodecName = CompressionCodecName.SNAPPY
//val compressionCodecName = CompressionCodecName.GZIP
val blockSize = 256 * 1024 * 1024
val pageSize = 1 * 1024 * 1024
val start = new Date( ).getTime
try {
if (s3Path != null) {
writeToS3(s3Path, accessKey, secretKey, casUser, casPwd, seeds, keyspace, table, compressionCodecName, blockSize, pageSize)
} else {
println("S3_PATH is empty")
}
} finally {
Cassandra.close()
}
val end = new Date().getTime
println(s"took ${end - start} ms to load table:$qv_id to $s3Path")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment