Created
November 4, 2018 09:46
-
-
Save sadikovi/183800b88b700f1e6d2bdf0aa525cbdd to your computer and use it in GitHub Desktop.
Parquet MR read file and list all of the records
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////// | |
// == Parquet read == | |
//////////////////////////////////////////////////////////////// | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.mapreduce._ | |
import org.apache.hadoop.mapreduce.lib.input.FileSplit | |
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | |
import org.apache.parquet.hadoop.ParquetInputSplit | |
import org.apache.parquet.hadoop.ParquetRecordReader | |
val conf = new Configuration(false) | |
val path = new Path("./parquet-rs/data.rs.bk/timestamps.parquet") | |
val status = path.getFileSystem(conf).getFileStatus(path) | |
val split = new FileSplit(status.getPath, 0, status.getLen, Array.empty) | |
val parquetSplit = new ParquetInputSplit(split.getPath, split.getStart, | |
split.getStart + split.getLength, split.getLength, split.getLocations, null) | |
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | |
val context = new TaskAttemptContextImpl(conf, attemptId) | |
val reader = new ParquetRecordReader[String](new SampleReadSupport()) | |
reader.initialize(parquetSplit, context) | |
while (reader.nextKeyValue) { | |
println(s"Record: ${reader.getCurrentKey} - ${reader.getCurrentValue}") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteOrder | |
import java.sql.{Date => SQLDate, Timestamp => SQLTimestamp} | |
import java.util.{Map => JMap} | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.parquet.schema._ | |
import org.apache.parquet.schema.OriginalType._ | |
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ | |
import org.apache.parquet.io.api._ | |
import org.apache.parquet.hadoop.api.ReadSupport | |
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext | |
import org.apache.spark.sql.catalyst.util.DateTimeUtils | |
abstract class RecordContainer { | |
def setParquetBinary(ordinal: Int, field: PrimitiveType, value: Binary): Unit | |
def setParquetInteger(ordinal: Int, field: PrimitiveType, value: Int): Unit | |
def setString(ordinal: Int, value: String): Unit | |
def setBoolean(ordinal: Int, value: Boolean): Unit | |
def setDouble(ordinal: Int, value: Double): Unit | |
def setInt(ordinal: Int, value: Int): Unit | |
def setLong(ordinal: Int, value: Long): Unit | |
def setDate(ordinal: Int, value: SQLDate): Unit | |
def setTimestamp(ordinal: Int, value: SQLTimestamp): Unit | |
def getByIndex(ordinal: Int): Any | |
def init(numFields: Int): Unit | |
def close(): Unit | |
} | |
class BufferRecordContainer extends RecordContainer { | |
// buffer to store values in container, column index - value | |
private var buffer: Array[Any] = null | |
override def setParquetBinary(ordinal: Int, field: PrimitiveType, value: Binary): Unit = { | |
field.getPrimitiveTypeName match { | |
case INT96 => | |
assert(value.length() == 12, | |
"Timestamps (with nanoseconds) are expected to be stored " + | |
s"in 12-byte long binaries, but got a ${value.length()}-byte binary") | |
val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) | |
val timeOfDayNanos = buf.getLong | |
val julianDay = buf.getInt | |
// microseconds since epoch | |
val micros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) | |
setTimestamp(ordinal, DateTimeUtils.toJavaTimestamp(micros)) | |
// all other types are treated as UTF8 string | |
case _ => | |
setString(ordinal, value.toStringUsingUTF8) | |
} | |
} | |
override def setParquetInteger(ordinal: Int, field: PrimitiveType, value: Int): Unit = { | |
field.getPrimitiveTypeName match { | |
case INT32 => | |
field.getOriginalType match { | |
case DATE => | |
setDate(ordinal, DateTimeUtils.toJavaDate(value)) | |
// all other values are parsed as signed int32 | |
case _ => setInt(ordinal, value) | |
} | |
case _ => | |
throw new IllegalArgumentException( | |
s"Field $field with value $value at position $ordinal " + | |
"cannot be parsed as Parquet Integer") | |
} | |
} | |
override def setString(ordinal: Int, value: String): Unit = buffer(ordinal) = value | |
override def setBoolean(ordinal: Int, value: Boolean): Unit = buffer(ordinal) = value | |
override def setDouble(ordinal: Int, value: Double): Unit = buffer(ordinal) = value | |
override def setInt(ordinal: Int, value: Int): Unit = buffer(ordinal) = value | |
override def setLong(ordinal: Int, value: Long): Unit = buffer(ordinal) = value | |
override def setDate(ordinal: Int, value: SQLDate): Unit = buffer(ordinal) = value | |
override def setTimestamp(ordinal: Int, value: SQLTimestamp): Unit = buffer(ordinal) = value | |
override def getByIndex(ordinal: Int): Any = buffer(ordinal) | |
// Initialize map before every read, allows to have access to current record and provides cleanup | |
// for every scan. | |
override def init(numFields: Int): Unit = { | |
if (buffer == null || numFields != buffer.length) { | |
buffer = new Array[Any](numFields) | |
} else { | |
for (i <- 0 until buffer.length) { | |
buffer(i) = null | |
} | |
} | |
} | |
override def close(): Unit = { } | |
override def toString(): String = { | |
val str = if (buffer == null) "null" else buffer.mkString("[", ", ", "]") | |
s"${getClass.getSimpleName}(buffer=$str)" | |
} | |
} | |
class SamplePrimitiveConverter( | |
val ordinal: Int, | |
val field: PrimitiveType, | |
val updater: RecordContainer) | |
extends PrimitiveConverter { | |
override def addBinary(value: Binary): Unit = updater.setParquetBinary(ordinal, field, value) | |
override def addInt(value: Int): Unit = updater.setParquetInteger(ordinal, field, value) | |
override def addLong(value: Long): Unit = updater.setLong(ordinal, value) | |
override def addDouble(value: Double): Unit = updater.setDouble(ordinal, value) | |
override def addBoolean(value: Boolean): Unit = updater.setBoolean(ordinal, value) | |
} | |
class SampleGroupConverter( | |
private val updater: RecordContainer, | |
private val schema: GroupType) | |
extends GroupConverter { | |
private val converters = prepareConverters(schema) | |
private def prepareConverters(schema: GroupType): Array[Converter] = { | |
val arr = new Array[Converter](schema.getFieldCount) | |
for (i <- 0 until arr.length) { | |
val tpe = schema.getType(i) | |
assert(tpe.isPrimitive, s"Only primitive types are supported, found schema $schema") | |
arr(i) = new SamplePrimitiveConverter(i, tpe.asPrimitiveType, updater) | |
} | |
arr | |
} | |
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) | |
// Initialize container with known fields in advance, so we can resize it | |
override def start(): Unit = updater.init(schema.getFieldCount) | |
override def end(): Unit = updater.close() | |
} | |
class SampleRecordMaterializer( | |
private val schema: MessageType) | |
extends RecordMaterializer[String] { | |
private val updater = new BufferRecordContainer() | |
override def getCurrentRecord(): String = updater.toString() | |
override def getRootConverter(): GroupConverter = { | |
new SampleGroupConverter(updater, schema) | |
} | |
} | |
class SampleReadSupport extends ReadSupport[String] { | |
override def init( | |
conf: Configuration, | |
keyValueMetaData: JMap[String, String], | |
fileSchema: MessageType): ReadContext = { | |
new ReadContext(fileSchema) | |
} | |
override def prepareForRead( | |
conf: Configuration, | |
keyValueMetaData: JMap[String, String], | |
fileSchema: MessageType, | |
context: ReadContext): RecordMaterializer[String] = { | |
new SampleRecordMaterializer(context.getRequestedSchema) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment