Last active
March 15, 2017 13:03
-
-
Save tispratik/f0044dd84dc8d8c6cbcf to your computer and use it in GitHub Desktop.
Parquet columns reader1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.company.grid.lookup; | |
import parquet.column.ColumnDescriptor; | |
import parquet.column.ColumnReader; | |
import parquet.column.impl.ColumnReadStoreImpl; | |
import parquet.column.page.PageReadStore; | |
import parquet.hadoop.ParquetFileReader; | |
import parquet.hadoop.metadata.BlockMetaData; | |
import parquet.hadoop.metadata.ParquetMetadata; | |
import parquet.io.api.Binary; | |
import parquet.io.api.Converter; | |
import parquet.io.api.GroupConverter; | |
import parquet.io.api.PrimitiveConverter; | |
import parquet.schema.MessageType; | |
import org.apache.hadoop.fs.Path; | |
import java.io.IOException; | |
import java.lang.reflect.Array; | |
import java.math.BigInteger; | |
import java.nio.charset.Charset; | |
import java.nio.charset.CharsetDecoder; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import org.apache.hadoop.fs.LocalFileSystem; | |
import org.apache.hadoop.hdfs.DistributedFileSystem; | |
import org.apache.hadoop.conf.Configuration; | |
final class DimColumn { | |
private volatile Object arrayList; | |
private String name; | |
private Class<?> typeOfArray; | |
private int totalSize; | |
private ColumnDescriptor columnDescriptor; | |
DimColumn(ColumnDescriptor columnDescriptor, int totalSize) { | |
this.columnDescriptor = columnDescriptor; | |
this.name = columnDescriptor.getPath()[0]; | |
this.typeOfArray = columnDescriptor.getType().javaType; | |
this.totalSize = totalSize; | |
this.arrayList = Array.newInstance(typeOfArray, totalSize); | |
} | |
public ColumnDescriptor getColumnDescriptor() { | |
return columnDescriptor; | |
} | |
public Object getArrayList() { | |
return arrayList; | |
} | |
public Object getName() { | |
return name; | |
} | |
public Object getTotalSize() { | |
return totalSize; | |
} | |
} | |
public class RfiParquetFileReader { | |
ParquetMetadata metaData; | |
MessageType schema; | |
private static final Charset UTF8 = Charset.forName("UTF-8"); | |
private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder(); | |
public RfiParquetFileReader(String fileName) throws IOException { | |
Configuration conf = new Configuration(); | |
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); | |
conf.set("fs.file.impl", LocalFileSystem.class.getName()); | |
Path filePath = new Path(fileName); | |
metaData = ParquetFileReader.readFooter(conf, filePath); | |
schema = metaData.getFileMetaData().getSchema(); | |
List<BlockMetaData> blocks; | |
blocks = metaData.getBlocks(); | |
int totalSize = (int) blocks.get(0).getRowCount(); | |
List<ColumnDescriptor> columnDescriptors = schema.getColumns(); | |
List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize); | |
ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors); | |
PageReadStore pageReadStore = fileReader.readNextRowGroup(); | |
int index = 0; | |
while (pageReadStore != null) { | |
ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema); | |
index = load(dimColumns, columnReadStoreImpl, index); | |
pageReadStore = fileReader.readNextRowGroup(); | |
} | |
Random rand = new Random(); | |
for (DimColumn dimColumn : dimColumns) { | |
System.out.println(dimColumn.getName()); | |
for(int i = 0; i < 5; i++) { | |
index = rand.nextInt((int) totalSize); | |
System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index)); | |
} | |
System.out.println("--------"); | |
} | |
} | |
public String getSchema() { | |
return schema.toString(); | |
} | |
public static void main(String[] args) { | |
String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/"; | |
// String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"}; | |
String[] files = {"13.assigned_conversion.parquet"}; | |
try { | |
long startTime = System.currentTimeMillis(); | |
for (String file : files) { | |
new RfiParquetFileReader(dirName + file); | |
System.out.println("========================================================================"); | |
} | |
long endTime = System.currentTimeMillis(); | |
System.out.println("Time taken: " + (endTime - startTime) + "ms"); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) { | |
ArrayList dimColumns = new ArrayList<DimColumn>(); | |
for (ColumnDescriptor columnDescriptor : columnDescriptors) { | |
dimColumns.add(new DimColumn(columnDescriptor, totalSize)); | |
} | |
return dimColumns; | |
} | |
public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException { | |
int index = 1; | |
for (DimColumn dc : dimColumns) { | |
index = startIndex; | |
int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel(); | |
ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor()); | |
// System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName()); | |
for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) { | |
int definitionLevel = columnReader.getCurrentDefinitionLevel(); | |
if (definitionLevel == maxDefinitionLevel) { | |
switch (dc.getColumnDescriptor().getType()) { | |
case BINARY: | |
String str = new String(columnReader.getBinary().getBytes(), "UTF-8"); | |
System.out.println(index + " : " + dc.getName() + " : " + str); | |
Array.set(dc.getArrayList(), index, columnReader.getBinary()); break; | |
case BOOLEAN: | |
Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break; | |
case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break; | |
case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break; | |
case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break; | |
case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break; | |
case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break; | |
// case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break; | |
} | |
} | |
columnReader.consume(); | |
index += 1; | |
} | |
} | |
return startIndex + index; | |
} | |
// public static String binaryToString(Binary value) { | |
// byte[] data = value.getBytes(); | |
// if (data == null) return null; | |
// | |
// try { | |
// CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer()); | |
// return buffer.toString(); | |
// } catch (Throwable th) { | |
// } | |
// | |
// return ""; | |
// } | |
public static BigInteger binaryToBigInteger(Binary value) { | |
byte[] data = value.getBytes(); | |
if (data == null) return null; | |
return new BigInteger(data); | |
} | |
private static final class DumpGroupConverter extends GroupConverter { | |
@Override public void start() { } | |
@Override public void end() { } | |
@Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); } | |
} | |
private static final class DumpConverter extends PrimitiveConverter { | |
@Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); } | |
} | |
} |
A similar issue was reported here https://issues.apache.org/jira/browse/DRILL-827
how were the parquet files created? Any chance you can share them?
Found the problem, i was getting the current definitionLevel outside of the for loop for (long i = 0, totalValueCount...
. I need to get the current definition level every time i consume()
.
definitionLevel = columnReader.getCurrentDefinitionLevel();
Updated the gist, now it should work. Btw, the parquet files were created through sqoop import of mysql.
This reader does not re-open the parquet file for every column read.
Hi Pratik,
i am using CDH5,
Could you please tell me the step by step procedure to insert the csv file into hive table as a parquet file formate.
thank you,
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This version of the reader throws an exception when reading some data of some columns.