Skip to content

Instantly share code, notes, and snippets.

@sadikovi
Created November 4, 2018 09:46
Show Gist options
  • Save sadikovi/183800b88b700f1e6d2bdf0aa525cbdd to your computer and use it in GitHub Desktop.
Save sadikovi/183800b88b700f1e6d2bdf0aa525cbdd to your computer and use it in GitHub Desktop.
Parquet MR read file and list all of the records
////////////////////////////////////////////////////////////////
// == Parquet read ==
////////////////////////////////////////////////////////////////
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.hadoop.ParquetInputSplit
import org.apache.parquet.hadoop.ParquetRecordReader
val conf = new Configuration(false)
val path = new Path("./parquet-rs/data.rs.bk/timestamps.parquet")
val status = path.getFileSystem(conf).getFileStatus(path)
val split = new FileSplit(status.getPath, 0, status.getLen, Array.empty)
val parquetSplit = new ParquetInputSplit(split.getPath, split.getStart,
split.getStart + split.getLength, split.getLength, split.getLocations, null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val context = new TaskAttemptContextImpl(conf, attemptId)
val reader = new ParquetRecordReader[String](new SampleReadSupport())
reader.initialize(parquetSplit, context)
while (reader.nextKeyValue) {
println(s"Record: ${reader.getCurrentKey} - ${reader.getCurrentValue}")
}
import java.nio.ByteOrder
import java.sql.{Date => SQLDate, Timestamp => SQLTimestamp}
import java.util.{Map => JMap}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.schema._
import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.io.api._
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.spark.sql.catalyst.util.DateTimeUtils
abstract class RecordContainer {
def setParquetBinary(ordinal: Int, field: PrimitiveType, value: Binary): Unit
def setParquetInteger(ordinal: Int, field: PrimitiveType, value: Int): Unit
def setString(ordinal: Int, value: String): Unit
def setBoolean(ordinal: Int, value: Boolean): Unit
def setDouble(ordinal: Int, value: Double): Unit
def setInt(ordinal: Int, value: Int): Unit
def setLong(ordinal: Int, value: Long): Unit
def setDate(ordinal: Int, value: SQLDate): Unit
def setTimestamp(ordinal: Int, value: SQLTimestamp): Unit
def getByIndex(ordinal: Int): Any
def init(numFields: Int): Unit
def close(): Unit
}
class BufferRecordContainer extends RecordContainer {
// buffer to store values in container, column index - value
private var buffer: Array[Any] = null
override def setParquetBinary(ordinal: Int, field: PrimitiveType, value: Binary): Unit = {
field.getPrimitiveTypeName match {
case INT96 =>
assert(value.length() == 12,
"Timestamps (with nanoseconds) are expected to be stored " +
s"in 12-byte long binaries, but got a ${value.length()}-byte binary")
val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
// microseconds since epoch
val micros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
setTimestamp(ordinal, DateTimeUtils.toJavaTimestamp(micros))
// all other types are treated as UTF8 string
case _ =>
setString(ordinal, value.toStringUsingUTF8)
}
}
override def setParquetInteger(ordinal: Int, field: PrimitiveType, value: Int): Unit = {
field.getPrimitiveTypeName match {
case INT32 =>
field.getOriginalType match {
case DATE =>
setDate(ordinal, DateTimeUtils.toJavaDate(value))
// all other values are parsed as signed int32
case _ => setInt(ordinal, value)
}
case _ =>
throw new IllegalArgumentException(
s"Field $field with value $value at position $ordinal " +
"cannot be parsed as Parquet Integer")
}
}
override def setString(ordinal: Int, value: String): Unit = buffer(ordinal) = value
override def setBoolean(ordinal: Int, value: Boolean): Unit = buffer(ordinal) = value
override def setDouble(ordinal: Int, value: Double): Unit = buffer(ordinal) = value
override def setInt(ordinal: Int, value: Int): Unit = buffer(ordinal) = value
override def setLong(ordinal: Int, value: Long): Unit = buffer(ordinal) = value
override def setDate(ordinal: Int, value: SQLDate): Unit = buffer(ordinal) = value
override def setTimestamp(ordinal: Int, value: SQLTimestamp): Unit = buffer(ordinal) = value
override def getByIndex(ordinal: Int): Any = buffer(ordinal)
// Initialize map before every read, allows to have access to current record and provides cleanup
// for every scan.
override def init(numFields: Int): Unit = {
if (buffer == null || numFields != buffer.length) {
buffer = new Array[Any](numFields)
} else {
for (i <- 0 until buffer.length) {
buffer(i) = null
}
}
}
override def close(): Unit = { }
override def toString(): String = {
val str = if (buffer == null) "null" else buffer.mkString("[", ", ", "]")
s"${getClass.getSimpleName}(buffer=$str)"
}
}
class SamplePrimitiveConverter(
val ordinal: Int,
val field: PrimitiveType,
val updater: RecordContainer)
extends PrimitiveConverter {
override def addBinary(value: Binary): Unit = updater.setParquetBinary(ordinal, field, value)
override def addInt(value: Int): Unit = updater.setParquetInteger(ordinal, field, value)
override def addLong(value: Long): Unit = updater.setLong(ordinal, value)
override def addDouble(value: Double): Unit = updater.setDouble(ordinal, value)
override def addBoolean(value: Boolean): Unit = updater.setBoolean(ordinal, value)
}
class SampleGroupConverter(
private val updater: RecordContainer,
private val schema: GroupType)
extends GroupConverter {
private val converters = prepareConverters(schema)
private def prepareConverters(schema: GroupType): Array[Converter] = {
val arr = new Array[Converter](schema.getFieldCount)
for (i <- 0 until arr.length) {
val tpe = schema.getType(i)
assert(tpe.isPrimitive, s"Only primitive types are supported, found schema $schema")
arr(i) = new SamplePrimitiveConverter(i, tpe.asPrimitiveType, updater)
}
arr
}
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)
// Initialize container with known fields in advance, so we can resize it
override def start(): Unit = updater.init(schema.getFieldCount)
override def end(): Unit = updater.close()
}
class SampleRecordMaterializer(
private val schema: MessageType)
extends RecordMaterializer[String] {
private val updater = new BufferRecordContainer()
override def getCurrentRecord(): String = updater.toString()
override def getRootConverter(): GroupConverter = {
new SampleGroupConverter(updater, schema)
}
}
class SampleReadSupport extends ReadSupport[String] {
override def init(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType): ReadContext = {
new ReadContext(fileSchema)
}
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
context: ReadContext): RecordMaterializer[String] = {
new SampleRecordMaterializer(context.getRequestedSchema)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment