Last active
January 16, 2020 13:30
-
-
Save speeddragon/6a98805d7f4aacff729f3d60b6a57ff8 to your computer and use it in GitHub Desktop.
Custom File Sink for Parquet( List[GenericRecord] )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val env = StreamExecutionEnvironment.getExecutionEnvironment | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) | |
... | |
val source = new FlinkKafkaConsumer(settings.kafkaTopic(), new AugmentedMessageDeserializer, kafkaProperties) | |
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[GenericRecord] { | |
def extractAscendingTimestamp(element: GenericRecord): Long = | |
Instant | |
.parse(element.get("timestamp").asInstanceOf[String]) | |
.toEpochMilli() | |
}) | |
val writer = WindowParquetGenericRecordListFileSink(settings.s3Path(), GenericRecordSchema.schema.toString()) | |
... | |
val backend = new RocksDBStateBackend("file:///tmp/rocksdb-checkpoint", true); | |
env | |
.setStateBackend(backend) | |
.enableCheckpointing(settings.checkpointInterval()) | |
.addSource(source) | |
.keyBy((record: GenericRecord) => | |
record.get("key").asInstanceOf[String] | |
) | |
.timeWindow(Time.seconds(settings.windowTime())) | |
.allowedLateness(Time.seconds(settings.allowedLateness())) | |
.trigger(new DelayEventTimeTrigger()) | |
.apply(new GenericRecordAggregatorWindowFunction()) | |
.addSink(writer) | |
env.execute() | |
// DelayEventTimeTrigger | |
/** | |
* Copy from EventTimeTrigger | |
*/ | |
class DelayEventTimeTrigger extends Trigger[Object, TimeWindow] { | |
val valueStateDescriptor = new ValueStateDescriptor[Boolean]("flag", classOf[Boolean]) | |
override def onElement( | |
element: Object, | |
timestamp: Long, | |
window: TimeWindow, | |
ctx: Trigger.TriggerContext | |
): TriggerResult = { | |
val flag = ctx.getPartitionedState(valueStateDescriptor).value() | |
// Flag only used to register one trigger per window. Flag is cleaned when FIRE action is executed. | |
if (!flag) { | |
val delay = window.getEnd - window.getStart | |
ctx.getPartitionedState(valueStateDescriptor).update(true) | |
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay) | |
ctx.registerEventTimeTimer(window.maxTimestamp()) | |
} | |
TriggerResult.CONTINUE | |
} | |
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { | |
TriggerResult.FIRE | |
} | |
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { | |
if (time == window.maxTimestamp()) { | |
TriggerResult.FIRE | |
} else { | |
TriggerResult.CONTINUE | |
} | |
} | |
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { | |
ctx.deleteEventTimeTimer(window.maxTimestamp) | |
} | |
} | |
// GenericRecordAggregatorWindowFunction | |
class GenericRecordAggregatorWindowFunction | |
extends WindowFunction[GenericRecord, Iterable[GenericRecord], String, TimeWindow] { | |
override def apply(key: String, window: TimeWindow, input: lang.Iterable[GenericRecord], | |
out: Collector[Iterable[GenericRecord]]): Unit = { | |
out.collect(input.asScala) | |
} | |
} | |
// WindowParquetGenericRecordListFileSink | |
case class WindowParquetGenericRecordListFileSink(filePath: String, schema: String) | |
extends ParquetGenericRecordListFileSink[GenericRecord] { | |
def getBucketId(element: GenericRecord): String = | |
"account_id=" + element.get(KEY.name).asInstanceOf[String] + | |
"/partition_date=" + formatDateString(element.get(TIMESTAMP.name).asInstanceOf[String]) | |
private def formatDateString(date: String) = | |
new SimpleDateFormat("yyyyMM").format( | |
new SimpleDateFormat("yyyy-MM-dd").parse(date)) | |
def getFileName(genericRecord: GenericRecord): String = { | |
genericRecord.get("logger_timestamp").asInstanceOf[String] | |
.replace(" ", "_") | |
.replace("/", "-") | |
.replace(":", "-") | |
.concat( ".parquet") | |
} | |
} | |
// ParquetGenericRecordListFileSink | |
trait ParquetGenericRecordListFileSink[IN] extends SinkFunction[Iterable[IN]] with LazyLogging { | |
def filePath: String | |
def schema: String | |
override def invoke(elements: Iterable[IN], context: Context[_]) { | |
val fileName = getFileName(elements.head) | |
val finalFilePath = s"${filePath}/${getBucketId(elements.head)}/${fileName}"; | |
val writer = AvroParquetWriter | |
.builder[IN](new Path(finalFilePath)) | |
.withSchema(new Schema.Parser().parse(schema)) | |
.withDataModel(GenericData.get) | |
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE) | |
.build() | |
elements.foreach( | |
(element) => writer.write(element) | |
) | |
writer.close() | |
logger.info(s"Writing to ${finalFilePath}") | |
} | |
def getBucketId(element: IN): String | |
def getFileName(genericRecord: IN): String | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment