Created
August 28, 2014 19:43
-
-
Save tispratik/f7a66f6a40b7ae3b98ad to your computer and use it in GitHub Desktop.
Parquet columns reader2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.company.grid.lookup_new; | |
import parquet.column.ColumnDescriptor; | |
import parquet.column.ColumnReader; | |
import parquet.column.impl.ColumnReadStoreImpl; | |
import parquet.column.page.PageReadStore; | |
import parquet.hadoop.ParquetFileReader; | |
import parquet.hadoop.metadata.BlockMetaData; | |
import parquet.hadoop.metadata.ParquetMetadata; | |
import parquet.io.api.Binary; | |
import parquet.io.api.Converter; | |
import parquet.io.api.GroupConverter; | |
import parquet.io.api.PrimitiveConverter; | |
import parquet.schema.MessageType; | |
import org.apache.hadoop.fs.Path; | |
import java.io.IOException; | |
import java.lang.reflect.Array; | |
import java.math.BigInteger; | |
import java.nio.charset.Charset; | |
import java.nio.charset.CharsetDecoder; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import org.apache.hadoop.fs.LocalFileSystem; | |
import org.apache.hadoop.hdfs.DistributedFileSystem; | |
import org.apache.hadoop.conf.Configuration; | |
final class DimColumn { | |
public volatile Object arrayList; | |
public String name; | |
public Class<?> typeOfArray; | |
public int size; | |
DimColumn(String name, Class<?> typeOfArray, int size) { | |
this.name = name; | |
this.typeOfArray = typeOfArray; | |
this.size = size; | |
this.arrayList = Array.newInstance(typeOfArray, size); | |
} | |
} | |
public class RfiParquetFileReader { | |
ParquetMetadata metaData; | |
MessageType schema; | |
private static final Charset UTF8 = Charset.forName("UTF-8"); | |
private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder(); | |
public RfiParquetFileReader(String fileName) throws IOException { | |
Configuration conf = new Configuration(); | |
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); | |
conf.set("fs.file.impl", LocalFileSystem.class.getName()); | |
Path filePath = new Path(fileName); | |
metaData = ParquetFileReader.readFooter(conf, filePath); | |
schema = metaData.getFileMetaData().getSchema(); | |
List<BlockMetaData> blocks; | |
blocks = metaData.getBlocks(); | |
long totalSize = blocks.get(0).getRowCount(); | |
List<ColumnDescriptor> columns = schema.getColumns(); | |
List<DimColumn> dimColumns = new ArrayList<DimColumn>(); | |
for (ColumnDescriptor columnDescriptor : columns) { | |
System.out.println(columnDescriptor.toString()); | |
DimColumn dimColumn = new DimColumn(columnDescriptor.getPath()[0], columnDescriptor.getType().javaType, (int) totalSize); | |
int index = 0; | |
ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, schema.getColumns()); | |
PageReadStore pageReadStore = fileReader.readNextRowGroup(); | |
while (pageReadStore != null) { | |
ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema); | |
index = load(columnReadStoreImpl, columnDescriptor, dimColumn, index); | |
pageReadStore = fileReader.readNextRowGroup(); | |
} | |
dimColumns.add(dimColumn); | |
} | |
Random rand = new Random(); | |
int index; | |
for (DimColumn dimColumn : dimColumns) { | |
System.out.println(dimColumn.name); | |
for(int i = 0; i < 5; i++) { | |
index = rand.nextInt((int) totalSize); | |
System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.arrayList, index)); | |
} | |
System.out.println("--------"); | |
} | |
} | |
public String getSchema() { | |
return schema.toString(); | |
} | |
public static void main(String[] args) { | |
String fileName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/13.assigned_conversion.parquet"; | |
try { | |
long startTime = System.currentTimeMillis(); | |
new RfiParquetFileReader(fileName); | |
long endTime = System.currentTimeMillis(); | |
System.out.println("Time taken: " + (endTime - startTime) + "ms"); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public static int load(ColumnReadStoreImpl columnReadStore, ColumnDescriptor column, DimColumn dimColumn, int index) throws IOException { | |
int maxDefinitionLevel = column.getMaxDefinitionLevel(); | |
ColumnReader columnReader = columnReadStore.getColumnReader(column); | |
for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) { | |
int definitionLevel = columnReader.getCurrentDefinitionLevel(); | |
if (definitionLevel == maxDefinitionLevel) { | |
switch (column.getType()) { | |
case BINARY: Array.set(dimColumn.arrayList, index, columnReader.getBinary()); break; | |
case BOOLEAN: Array.set(dimColumn.arrayList, index, columnReader.getBoolean()); break; | |
case DOUBLE: Array.set(dimColumn.arrayList, index, columnReader.getDouble()); break; | |
case FLOAT: Array.set(dimColumn.arrayList, index, columnReader.getFloat()); break; | |
case INT32: Array.set(dimColumn.arrayList, index, columnReader.getInteger()); break; | |
case INT64: Array.set(dimColumn.arrayList, index, columnReader.getLong()); break; | |
case INT96: Array.set(dimColumn.arrayList, index, binaryToBigInteger(columnReader.getBinary())); break; | |
// case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break; | |
} | |
} | |
columnReader.consume(); | |
index += 1; | |
} | |
return index; | |
} | |
// public static String binaryToString(Binary value) { | |
// byte[] data = value.getBytes(); | |
// if (data == null) return null; | |
// | |
// try { | |
// CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer()); | |
// return buffer.toString(); | |
// } catch (Throwable th) { | |
// } | |
// | |
// return ""; | |
// } | |
public static BigInteger binaryToBigInteger(Binary value) { | |
byte[] data = value.getBytes(); | |
if (data == null) return null; | |
return new BigInteger(data); | |
} | |
private static final class DumpGroupConverter extends GroupConverter { | |
@Override public void start() { } | |
@Override public void end() { } | |
@Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } | |
} | |
private static final class DumpConverter extends PrimitiveConverter { | |
@Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This reader re-opens the parquet file every time it reads a new column. An optimized version is https://gist.github.com/tispratik/f0044dd84dc8d8c6cbcf