Created
March 4, 2019 15:46
-
-
Save hotienvu/eaf2a360ee4a8d0558ffb2ca8729d94b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.generic.GenericRecord | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.Path | |
import org.apache.parquet.avro.{AvroParquetReader, AvroReadSupport, AvroSchemaConverter} | |
import org.apache.parquet.filter2.compat.FilterCompat | |
import org.apache.parquet.filter2.predicate.{FilterApi, UserDefinedPredicate} | |
import org.apache.parquet.format.converter.ParquetMetadataConverter | |
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} | |
import org.apache.parquet.io.api.Binary | |
import org.apache.parquet.schema.Types | |
import org.apache.parquet.schema.Types.MessageTypeBuilder | |
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader | |
object Main { | |
def main(args: Array[String]): Unit = { | |
val parquetReader = new VectorizedParquetRecordReader(true); | |
import scala.collection.JavaConverters._ | |
parquetReader.initialize("people.parquet", List("name").asJava) | |
val batch = parquetReader.resultBatch() | |
var numBatches = 0 | |
while (parquetReader.nextBatch()) { | |
numBatches += 1 | |
println("no. rows", batch.numRows()) | |
batch.rowIterator().asScala.foreach(it => println(it.getString(0))) | |
} | |
val path = new Path("people.parquet") | |
val metadata = ParquetFileReader.readFooter(new Configuration(), path, ParquetMetadataConverter.NO_FILTER).getFileMetaData | |
val parquetSchema = metadata.getSchema | |
val builder = Types.buildMessage() | |
List("age", "name").foreach(field => { | |
val path: Array[String] = field.split('.') | |
if (path.length > 0) { | |
val p = path(0) | |
val fieldType = parquetSchema.getType(List(p) :_*) | |
builder.addField(fieldType) | |
} | |
}) | |
val projectedSchema = builder.named("Person") | |
val avroSchema = new AvroSchemaConverter().convert(projectedSchema) | |
println("num batches: ", numBatches) | |
println(avroSchema.toString(true)) | |
val gt = FilterApi.gtEq(FilterApi.longColumn("age"), java.lang.Long.valueOf(10L)) | |
val isNotnull = FilterApi.notEq(FilterApi.binaryColumn("name"), Binary.fromString("null")) | |
val filter = FilterApi.and(gt, isNotnull) | |
println(filter) | |
val conf = new Configuration() | |
conf.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, avroSchema.toString) | |
println(conf.get("parquet.private.read.filter.predicate.human.readable")) | |
val reader = AvroParquetReader.builder(new Path("people.parquet")) | |
.withConf(conf) | |
.withFilter(FilterCompat.get(filter)) | |
.build() | |
var record: GenericRecord = null | |
do { | |
record = reader.read() | |
println(record) | |
} while (record != null) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment