Skip to content

Instantly share code, notes, and snippets.

@masayuki038
Created June 27, 2017 17:04
Show Gist options
  • Save masayuki038/4be6c8538dfd4563a8d5ff743cf375ae to your computer and use it in GitHub Desktop.
Save masayuki038/4be6c8538dfd4563a8d5ff743cf375ae to your computer and use it in GitHub Desktop.
package net.wrap_trap.parquet_arrow_samples;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.NullableIntVector;
import org.apache.arrow.vector.NullableBigIntVector;
import org.apache.arrow.vector.NullableVarCharVector;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.parquet.schema.Type;
import org.apache.arrow.vector.FieldVector;
import java.util.ArrayList;
import java.util.List;
public class ArrowConverter extends GroupConverter {
private Converter[] converters;
private String name;
private ArrowConverter parent;
private GroupType schema;
private VectorSchemaRoot vectorSchemaRoot;
public ArrowConverter(GroupType schema) {
this(schema, null, null);
}
public ArrowConverter(GroupType schema, String name, ArrowConverter parent) {
this.converters = new Converter[schema.getFieldCount()];
this.name = name;
this.parent = parent;
this.schema = schema;
int i = 0;
for (Type field: schema.getFields()) {
this.converters[i++] = createConverter(field);
}
}
protected Converter createConverter(Type field) {
if (field.isPrimitive()) {
PrimitiveType.PrimitiveTypeName name = field.asPrimitiveType().getPrimitiveTypeName();
switch (name) {
case INT32:
return new IntConverter(field.getName());
case INT64:
return new LongConverter(field.getName());
case BINARY:
return new StringConverter(field.getName());
}
throw new IllegalStateException("Unexpected PrimitiveTypeName: " + name);
}
return new ArrowConverter(field.asGroupType());
}
@Override
public Converter getConverter(int fieldIndex) {
return converters[fieldIndex];
}
VectorSchemaRoot getCurrentRecord() {
return vectorSchemaRoot;
//return null;
}
@Override
public void start() {
}
@Override
public void end() {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
List<Field> fieldList = new ArrayList<>();
List<FieldVector> vectorList = new ArrayList<>();
int valueCountMin = Integer.MAX_VALUE;
for (Converter converter: converters) {
if (converter instanceof PrimitiveBufferConverter) {
PrimitiveBufferConverter fieldConverter = (PrimitiveBufferConverter) converter;
FieldVector fieldVector = fieldConverter.createFieldVector(allocator);
fieldList.add(fieldVector.getField());
vectorList.add(fieldVector);
valueCountMin = Math.min(fieldVector.getAccessor().getValueCount(), valueCountMin);
fieldConverter.reset();
}
}
this.vectorSchemaRoot = new VectorSchemaRoot(fieldList, vectorList, valueCountMin);
}
abstract private class PrimitiveBufferConverter<T> extends PrimitiveConverter {
abstract public void reset();
abstract public FieldVector createFieldVector(BufferAllocator allocator);
}
private class StringConverter extends PrimitiveBufferConverter<String> {
private String name;
private List<String> container;
public StringConverter(String name) {
this.name = name;
this.container = new ArrayList<>();
}
@Override
public void addBinary(Binary value) {
this.container.add(value.toStringUsingUTF8());
}
@Override
public void reset() {
this.container = new ArrayList<>();
}
@Override
public FieldVector createFieldVector(BufferAllocator allocator) {
NullableVarCharVector vector = new NullableVarCharVector(name, allocator);
vector.allocateNew();
NullableVarCharVector.Mutator mutator = vector.getMutator();
for (int i = 0; i < this.container.size(); i++) {
mutator.set(i, this.container.get(i).getBytes());
}
mutator.setValueCount(this.container.size());
//System.out.println("this.container.size(): " + this.container.size());
return vector;
}
}
private class IntConverter extends PrimitiveBufferConverter<Integer> {
private String name;
private List<Integer> container;
public IntConverter(String name) {
this.name = name;
this.container = new ArrayList<>();
}
@Override
public void addInt(int value) {
this.container.add(value);
}
@Override
public void reset() {
this.container = new ArrayList<>();
}
@Override
public FieldVector createFieldVector(BufferAllocator allocator) {
NullableIntVector vector = new NullableIntVector(name, allocator);
vector.allocateNew();
NullableIntVector.Mutator mutator = vector.getMutator();
for (int i = 0; i < this.container.size(); i++) {
mutator.set(i, this.container.get(i));
}
mutator.setValueCount(this.container.size());
return vector;
}
}
private class LongConverter extends PrimitiveBufferConverter<Long> {
private String name;
private List<Long> container = new ArrayList<>();
public LongConverter(String name) {
this.name = name;
this.container = new ArrayList<>();
}
@Override
public void addLong(long value) {
container.add(value);
}
@Override
public void reset() {
this.container = new ArrayList<>();
}
@Override
public FieldVector createFieldVector(BufferAllocator allocator) {
NullableBigIntVector vector = new NullableBigIntVector(name, allocator);
vector.allocateNew();
NullableBigIntVector.Mutator mutator = vector.getMutator();
for (int i = 0; i < this.container.size(); i++) {
mutator.set(i, this.container.get(i));
}
mutator.setValueCount(this.container.size());
return vector;
}
}
}
package net.wrap_trap.parquet_arrow_samples;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
/**
* Created by masayuki on 2017/06/25.
*/
public class ArrowMaterializer extends RecordMaterializer<VectorSchemaRoot> {
private ArrowConverter root;
public ArrowMaterializer(MessageType schema) {
this.root = new ArrowConverter(schema);
}
@Override
public VectorSchemaRoot getCurrentRecord() {
return root.getCurrentRecord();
}
@Override
public GroupConverter getRootConverter() {
return this.root;
}
}
package net.wrap_trap.parquet_arrow_samples;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import java.util.Map;
public class ArrowSupport extends ReadSupport<VectorSchemaRoot> {
@Override
public RecordMaterializer<VectorSchemaRoot> prepareForRead(Configuration configuration, Map<String, String> map, MessageType schema, ReadContext readContext) {
return new ArrowMaterializer(schema);
}
@Override
public ReadSupport.ReadContext init(InitContext context) {
return new ReadContext(context.getFileSchema());
}
}
package net.wrap_trap.parquet_arrow_samples;
import org.apache.hadoop.fs.Path;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector.Accessor;
public class ParquetToArrowSample {
private static void read(String input) throws Exception {
try (ParquetReader<VectorSchemaRoot> reader = ParquetReader.builder(new ArrowSupport(), new Path(input)).build()) {
for (VectorSchemaRoot vectorSchemaRoot = reader.read(); vectorSchemaRoot != null; vectorSchemaRoot = reader.read()) {
for (FieldVector vector: vectorSchemaRoot.getFieldVectors()) {
Accessor accessor = vector.getAccessor();
for (int i = 0; i < accessor.getValueCount(); i++) {
System.out.println(vector.getField().getName() + ": " + accessor.getObject(i));
}
}
System.out.println();
}
}
}
public static void main(String[] args) throws Exception {
read("D:\\development\\repository\\git\\drill\\sample-data\\nationsSF\\nationsSF.parquet");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment