Last active
November 18, 2022 21:42
-
-
Save cerisier/9748d493bbcc301384aec52216601cb8 to your computer and use it in GitHub Desktop.
Hackish example of writing protobuf DynamicMessage objects to parquet via Avro using AvroParquetWriter.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example | |
import com.google.protobuf.Descriptors | |
import com.google.protobuf.DynamicMessage | |
import org.apache.avro.LogicalTypes | |
import org.apache.avro.Schema | |
import org.apache.avro.protobuf.ProtobufData | |
import org.apache.hadoop.fs.Path | |
import org.apache.parquet.avro.AvroParquetWriter | |
import org.apache.parquet.hadoop.ParquetWriter | |
import org.example.proto.MyProtobufMessageProto.MyProtobufMessage | |
// Custom FieldOptions specifying which parquet logical types to apply upon schema transformation | |
import org.example.proto.Transform | |
import org.example.proto.Transform.ParquetAnnotation | |
class DynamicProtobufData: ProtobufData() { | |
override fun getField(r: Any?, name: String?, pos: Int): Any? { | |
return getField(r, name, pos, getRecordState(r, getSchema((r as DynamicMessage).descriptorForType))) | |
} | |
override fun setField(r: Any?, n: String?, pos: Int, value: Any?) { | |
setField(r, n, pos, value, getRecordState(r, getSchema((r as DynamicMessage).descriptorForType))) | |
} | |
override fun getSchema(f: Descriptors.FieldDescriptor): Schema { | |
val schema = super.getSchema(f) | |
if (f.options.hasExtension(Transform.parquet) && | |
f.options.getExtension(Transform.parquet).type == ParquetAnnotation.Annotation.TIMESTAMP) { | |
LogicalTypes.timestampMillis().addToSchema(schema) | |
} | |
return schema | |
} | |
} | |
fun main() { | |
// Descriptor could be parsed from a DescriptorProto as well | |
val descriptor = MyProtobufMessage.getDescriptor() | |
// Proto message could be bytes array directly | |
val protoBytes = MyProtobufMessage.newBuilder().setCreatedAt(0xdeadbeef).build().toByteArray() | |
val message = DynamicMessage.parseFrom( | |
descriptor, | |
protoBytes | |
) | |
val model = DynamicProtobufData() | |
val schema = model.getSchema(message.descriptorForType) | |
println(schema) | |
val writer: ParquetWriter<DynamicMessage> = | |
AvroParquetWriter.builder<DynamicMessage>(Path("avro.parquet")) | |
.withDataModel(model) // use the protobuf data model | |
.withSchema(schema) // Avro schema for the protobuf data | |
.build() | |
writer.write(message) | |
writer.close() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package org.example.proto; | |
option java_package = "org.example.proto"; | |
option java_outer_classname = "MyProtobufMessageProto"; | |
import "transform.proto"; | |
message MyProtobufMessage { | |
int64 created_at = 1 [(transform) = { type: TIMESTAMP }]; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package org.example.proto; | |
import "google/protobuf/descriptor.proto"; | |
message ParquetAnnotation { | |
enum Annotation { | |
EMPTY = 0; | |
TIMESTAMP = 1; | |
} | |
Annotation type = 1; | |
} | |
extend google.protobuf.FieldOptions { | |
ParquetAnnotation parquet = 91100; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment