Created
June 27, 2017 17:04
-
-
Save masayuki038/4be6c8538dfd4563a8d5ff743cf375ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.wrap_trap.parquet_arrow_samples; | |
import org.apache.arrow.memory.BufferAllocator; | |
import org.apache.arrow.memory.RootAllocator; | |
import org.apache.arrow.vector.VectorSchemaRoot; | |
import org.apache.arrow.vector.NullableIntVector; | |
import org.apache.arrow.vector.NullableBigIntVector; | |
import org.apache.arrow.vector.NullableVarCharVector; | |
import org.apache.parquet.io.api.Binary; | |
import org.apache.parquet.io.api.Converter; | |
import org.apache.parquet.io.api.GroupConverter; | |
import org.apache.parquet.io.api.PrimitiveConverter; | |
import org.apache.parquet.schema.GroupType; | |
import org.apache.parquet.schema.PrimitiveType; | |
import org.apache.arrow.vector.types.pojo.Field; | |
import org.apache.parquet.schema.Type; | |
import org.apache.arrow.vector.FieldVector; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class ArrowConverter extends GroupConverter { | |
private Converter[] converters; | |
private String name; | |
private ArrowConverter parent; | |
private GroupType schema; | |
private VectorSchemaRoot vectorSchemaRoot; | |
public ArrowConverter(GroupType schema) { | |
this(schema, null, null); | |
} | |
public ArrowConverter(GroupType schema, String name, ArrowConverter parent) { | |
this.converters = new Converter[schema.getFieldCount()]; | |
this.name = name; | |
this.parent = parent; | |
this.schema = schema; | |
int i = 0; | |
for (Type field: schema.getFields()) { | |
this.converters[i++] = createConverter(field); | |
} | |
} | |
protected Converter createConverter(Type field) { | |
if (field.isPrimitive()) { | |
PrimitiveType.PrimitiveTypeName name = field.asPrimitiveType().getPrimitiveTypeName(); | |
switch (name) { | |
case INT32: | |
return new IntConverter(field.getName()); | |
case INT64: | |
return new LongConverter(field.getName()); | |
case BINARY: | |
return new StringConverter(field.getName()); | |
} | |
throw new IllegalStateException("Unexpected PrimitiveTypeName: " + name); | |
} | |
return new ArrowConverter(field.asGroupType()); | |
} | |
@Override | |
public Converter getConverter(int fieldIndex) { | |
return converters[fieldIndex]; | |
} | |
VectorSchemaRoot getCurrentRecord() { | |
return vectorSchemaRoot; | |
//return null; | |
} | |
@Override | |
public void start() { | |
} | |
@Override | |
public void end() { | |
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); | |
List<Field> fieldList = new ArrayList<>(); | |
List<FieldVector> vectorList = new ArrayList<>(); | |
int valueCountMin = Integer.MAX_VALUE; | |
for (Converter converter: converters) { | |
if (converter instanceof PrimitiveBufferConverter) { | |
PrimitiveBufferConverter fieldConverter = (PrimitiveBufferConverter) converter; | |
FieldVector fieldVector = fieldConverter.createFieldVector(allocator); | |
fieldList.add(fieldVector.getField()); | |
vectorList.add(fieldVector); | |
valueCountMin = Math.min(fieldVector.getAccessor().getValueCount(), valueCountMin); | |
fieldConverter.reset(); | |
} | |
} | |
this.vectorSchemaRoot = new VectorSchemaRoot(fieldList, vectorList, valueCountMin); | |
} | |
abstract private class PrimitiveBufferConverter<T> extends PrimitiveConverter { | |
abstract public void reset(); | |
abstract public FieldVector createFieldVector(BufferAllocator allocator); | |
} | |
private class StringConverter extends PrimitiveBufferConverter<String> { | |
private String name; | |
private List<String> container; | |
public StringConverter(String name) { | |
this.name = name; | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public void addBinary(Binary value) { | |
this.container.add(value.toStringUsingUTF8()); | |
} | |
@Override | |
public void reset() { | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public FieldVector createFieldVector(BufferAllocator allocator) { | |
NullableVarCharVector vector = new NullableVarCharVector(name, allocator); | |
vector.allocateNew(); | |
NullableVarCharVector.Mutator mutator = vector.getMutator(); | |
for (int i = 0; i < this.container.size(); i++) { | |
mutator.set(i, this.container.get(i).getBytes()); | |
} | |
mutator.setValueCount(this.container.size()); | |
//System.out.println("this.container.size(): " + this.container.size()); | |
return vector; | |
} | |
} | |
private class IntConverter extends PrimitiveBufferConverter<Integer> { | |
private String name; | |
private List<Integer> container; | |
public IntConverter(String name) { | |
this.name = name; | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public void addInt(int value) { | |
this.container.add(value); | |
} | |
@Override | |
public void reset() { | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public FieldVector createFieldVector(BufferAllocator allocator) { | |
NullableIntVector vector = new NullableIntVector(name, allocator); | |
vector.allocateNew(); | |
NullableIntVector.Mutator mutator = vector.getMutator(); | |
for (int i = 0; i < this.container.size(); i++) { | |
mutator.set(i, this.container.get(i)); | |
} | |
mutator.setValueCount(this.container.size()); | |
return vector; | |
} | |
} | |
private class LongConverter extends PrimitiveBufferConverter<Long> { | |
private String name; | |
private List<Long> container = new ArrayList<>(); | |
public LongConverter(String name) { | |
this.name = name; | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public void addLong(long value) { | |
container.add(value); | |
} | |
@Override | |
public void reset() { | |
this.container = new ArrayList<>(); | |
} | |
@Override | |
public FieldVector createFieldVector(BufferAllocator allocator) { | |
NullableBigIntVector vector = new NullableBigIntVector(name, allocator); | |
vector.allocateNew(); | |
NullableBigIntVector.Mutator mutator = vector.getMutator(); | |
for (int i = 0; i < this.container.size(); i++) { | |
mutator.set(i, this.container.get(i)); | |
} | |
mutator.setValueCount(this.container.size()); | |
return vector; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.wrap_trap.parquet_arrow_samples; | |
import org.apache.arrow.vector.VectorSchemaRoot; | |
import org.apache.parquet.io.api.GroupConverter; | |
import org.apache.parquet.io.api.RecordMaterializer; | |
import org.apache.parquet.schema.MessageType; | |
/** | |
* Created by masayuki on 2017/06/25. | |
*/ | |
public class ArrowMaterializer extends RecordMaterializer<VectorSchemaRoot> { | |
private ArrowConverter root; | |
public ArrowMaterializer(MessageType schema) { | |
this.root = new ArrowConverter(schema); | |
} | |
@Override | |
public VectorSchemaRoot getCurrentRecord() { | |
return root.getCurrentRecord(); | |
} | |
@Override | |
public GroupConverter getRootConverter() { | |
return this.root; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.wrap_trap.parquet_arrow_samples; | |
import org.apache.arrow.vector.VectorSchemaRoot; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.parquet.hadoop.api.InitContext; | |
import org.apache.parquet.hadoop.api.ReadSupport; | |
import org.apache.parquet.io.api.RecordMaterializer; | |
import org.apache.parquet.schema.MessageType; | |
import java.util.Map; | |
public class ArrowSupport extends ReadSupport<VectorSchemaRoot> { | |
@Override | |
public RecordMaterializer<VectorSchemaRoot> prepareForRead(Configuration configuration, Map<String, String> map, MessageType schema, ReadContext readContext) { | |
return new ArrowMaterializer(schema); | |
} | |
@Override | |
public ReadSupport.ReadContext init(InitContext context) { | |
return new ReadContext(context.getFileSchema()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.wrap_trap.parquet_arrow_samples; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.arrow.vector.VectorSchemaRoot; | |
import org.apache.parquet.hadoop.ParquetReader; | |
import org.apache.arrow.vector.FieldVector; | |
import org.apache.arrow.vector.ValueVector.Accessor; | |
public class ParquetToArrowSample { | |
private static void read(String input) throws Exception { | |
try (ParquetReader<VectorSchemaRoot> reader = ParquetReader.builder(new ArrowSupport(), new Path(input)).build()) { | |
for (VectorSchemaRoot vectorSchemaRoot = reader.read(); vectorSchemaRoot != null; vectorSchemaRoot = reader.read()) { | |
for (FieldVector vector: vectorSchemaRoot.getFieldVectors()) { | |
Accessor accessor = vector.getAccessor(); | |
for (int i = 0; i < accessor.getValueCount(); i++) { | |
System.out.println(vector.getField().getName() + ": " + accessor.getObject(i)); | |
} | |
} | |
System.out.println(); | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
read("D:\\development\\repository\\git\\drill\\sample-data\\nationsSF\\nationsSF.parquet"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment