Skip to content

Instantly share code, notes, and snippets.

@speeddragon
Created January 10, 2020 02:51
Show Gist options
  • Select an option

  • Save speeddragon/18fbd570557da59d7f6a2c5822cc7ad4 to your computer and use it in GitHub Desktop.

Select an option

Save speeddragon/18fbd570557da59d7f6a2c5822cc7ad4 to your computer and use it in GitHub Desktop.
StreamingFileSink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties)
val writer = Writer(settings.s3Path(), GenericRecordSchema.schema.toString()).build()
env
.enableCheckpointing(checkpointInterval)
.addSource(source)
.addSink(writer)
env.execute()
// Writer.scala
case class Writer(pathString: String, schema: String) {
def build(): StreamingFileSink[GenericRecord] = {
val path = new Path(pathString)
val builder: ParquetBuilder[GenericRecord] = (out: OutputFile) =>
createAvroParquetWriter(schema, GenericData.get, out)
val writer = new ParquetWriterFactory(builder)
StreamingFileSink
.forBulkFormat(path, writer)
.withBucketAssigner(new MessageBucketAssigner)
.build
}
@throws[IOException]
private def createAvroParquetWriter[T](schemaString: String, dataModel: GenericData, out: OutputFile) = {
val schema = new Schema.Parser().parse(schemaString)
AvroParquetWriter
.builder[T](out)
.withSchema(schema)
.withDataModel(dataModel)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build
}
}
@jainanuj07
Copy link
Copy Markdown

Hi Can u please help me with AugmentedMessageDeserializer and GenericRecordSchema class code.

@speeddragon
Copy link
Copy Markdown
Author

speeddragon commented Jan 25, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment