Created
July 14, 2022 15:06
-
-
Save ottomata/1813c50ee69bd20997d335a9f61421a7 to your computer and use it in GitHub Desktop.
EventDataStreamFactory.kafkaDataStream.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from Kafka. | |
* If you need more control of your KafkaSource, use | |
* {@link EventDataStreamFactory#kafkaSourceBuilder(String, String, String)} and then | |
* call {@link StreamExecutionEnvironment#fromSource(Source, WatermarkStrategy, String)} yourself. | |
* | |
* Example: | |
* <pre>{@code | |
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...) | |
* DataStreamSource<Row> eventStreamSource = eventDataStreamFactory.kafkaDataStream( | |
* "test.event.example", // EventStream name | |
* env, // Flink StreamExecutionEnvironment, | |
* WatermarkStrategy.noWatermarks(), | |
* "localhost:9092", | |
* "my_consumer_group" | |
* | |
* }</pre> | |
* | |
* @param streamName | |
* Name of the EventStream, must be declared in EventStreamConfig. | |
* | |
* @param env | |
* StreamExecutionEnvironment in which to call fromSource. | |
* | |
* @param watermarkStrategy | |
* For simple testing input from files, you'll likely want to use WatermarkStrategy.noWatermarks(); | |
* | |
* @param bootstrapServers | |
* Kafka bootstrap.servers property. | |
* | |
* @param consumerGroup | |
* Kafka consumer.group.id property. | |
*/ | |
public DataStreamSource<Row> kafkaDataStream( | |
String streamName, | |
StreamExecutionEnvironment env, | |
WatermarkStrategy<Row> watermarkStrategy, | |
String bootstrapServers, | |
String consumerGroup | |
) { | |
EventStream eventStream = eventStreamFactory.createEventStream(streamName); | |
// Make a nice DataStreamSource description for the stream from a Kafka topic | |
String dataSourceDescription = eventStream + | |
" from topics [" + String.join(",", eventStream.topics()) + "]"; | |
return env.fromSource( | |
kafkaSourceBuilder(streamName, bootstrapServers, consumerGroup).build(), | |
watermarkStrategy, | |
dataSourceDescription | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment