Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ottomata/1813c50ee69bd20997d335a9f61421a7 to your computer and use it in GitHub Desktop.
Save ottomata/1813c50ee69bd20997d335a9f61421a7 to your computer and use it in GitHub Desktop.
EventDataStreamFactory.kafkaDataStream.java
/**
* Gets a {@link DataStreamSource} of {@link Row} for streamName that reads JSON events from Kafka.
* If you need more control of your KafkaSource, use
* {@link EventDataStreamFactory#kafkaSourceBuilder(String, String, String)} and then
* call {@link StreamExecutionEnvironment#fromSource(Source, WatermarkStrategy, String)} yourself.
*
* Example:
* <pre>{@code
* EventDataStreamFactory eventDataStreamFactory = EventDataStreamFactory.from(...)
* DataStreamSource&lt;Row&gt; eventStreamSource = eventDataStreamFactory.kafkaDataStream(
* "test.event.example", // EventStream name
* env, // Flink StreamExecutionEnvironment,
* WatermarkStrategy.noWatermarks(),
* "localhost:9092",
* "my_consumer_group"
*
* }</pre>
*
* @param streamName
* Name of the EventStream, must be declared in EventStreamConfig.
*
* @param env
* StreamExecutionEnvironment in which to call fromSource.
*
* @param watermarkStrategy
* For simple testing input from files, you'll likely want to use WatermarkStrategy.noWatermarks();
*
* @param bootstrapServers
* Kafka bootstrap.servers property.
*
* @param consumerGroup
* Kafka consumer.group.id property.
*/
public DataStreamSource<Row> kafkaDataStream(
String streamName,
StreamExecutionEnvironment env,
WatermarkStrategy<Row> watermarkStrategy,
String bootstrapServers,
String consumerGroup
) {
EventStream eventStream = eventStreamFactory.createEventStream(streamName);
// Make a nice DataStreamSource description for the stream from a Kafka topic
String dataSourceDescription = eventStream +
" from topics [" + String.join(",", eventStream.topics()) + "]";
return env.fromSource(
kafkaSourceBuilder(streamName, bootstrapServers, consumerGroup).build(),
watermarkStrategy,
dataSourceDescription
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment