Last active
June 7, 2022 13:42
-
-
Save ottomata/bfc00d82e30299bea8b92301c7f23108 to your computer and use it in GitHub Desktop.
Event Platform Flink Integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.streaming.api.scala._ | |
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment | |
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment | |
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) | |
// Write case clases (or Java Pojos) for every event schema out there. | |
// There are at least 117 of them! | |
case class Meta( | |
uri: String, | |
request_id: String, | |
id: String, | |
dt: String, | |
domain: String, | |
stream: String | |
) | |
case class Performer( | |
user_text: Option[String] = None, | |
user_groups: Option[List[String]] = None, | |
user_is_bot: Option[Boolean] = None, | |
user_id: Option[Long] = None, | |
user_registration_dt: Option[String] = None, | |
user_edit_count: Option[Long] = None | |
) | |
case class PageCreate(meta: Meta, | |
database: String, | |
page_id: Long, | |
page_title: String, | |
page_namespace: Int, | |
rev_id: Long, | |
rev_timestamp: String, | |
rev_sha1: String, | |
rev_minor_edit: Boolean, | |
rev_len: Int, | |
rev_content_model: String, | |
rev_content_format: String, | |
performer: Option[Performer] = None, | |
page_is_redirect: Boolean, | |
comment: Option[String], | |
parsedcomment: Option[String] | |
) | |
// This is just an implicit to allow for automatic JSON desiarlization into the PageCreate class. | |
implicit val config: Configuration = Configuration.default.withDefaults | |
val kafkaProperties = new Properties() | |
kafkaProperties.setProperty("bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet") // list of Kafka brokers | |
kafkaProperties.setProperty("group.id", "my_consumer_group_name") // Name of your consumer | |
val PageCreateSource: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]( | |
s"eqiad.mediawiki.page-create;codfw.mediawiki.page-create", | |
new SimpleStringSchema(), | |
KafkaConfig.properties | |
) | |
// Finally! A Flink DataStream[PageCreate]. | |
val pageCreateStream: DataStream[ActionQuery] = env.addSource(PageCreateSource) | |
.flatMap({ message: String => | |
decode[PageCreate](message) match { | |
case Left(error) => | |
Log.error(error.toString) | |
None | |
case Right(pageCreate: PageCreate) => Option(pageCreate) | |
} | |
}) | |
.map((pageCreate: PageCreate) => { pageCreate }) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.streaming.api.scala._ | |
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment | |
// Uses EventTableDescriptorBuilder plus some Kafka configs. | |
// https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/table/EventTableDescriptorBuilder.java | |
// https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/blob/T307959-event-table-source/src/main/scala/org/wikimedia/dataplatform/KafkaEventStreamFactory.scala | |
import org.wikimedia.dataplatform.KafkaEventStreamFactory | |
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment | |
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) | |
val streamConfigUri = "https://meta.wikimedia.org/w/api.php" | |
val schemaBaseUris = Seq( | |
"https://schema.discovery.wmnet/repositories/primary/jsonschema", | |
"https://schema.discovery.wmnet/repositories/secondary/jsonschema" | |
) | |
val httpClientRoutes = Map( | |
"https://meta.wikimedia.org" -> "https://api-ro.discovery.wmnet" | |
) | |
// Combines Kafka config, EventStreamConfig and Schema Repos | |
// to automate instantiation of Event Platform streams from Kafka. | |
val kafkaEventStreamFactory: KafkaEventStreamFactory = KafkaEventStreamFactory( | |
tableEnv, // StreamTableEnvironment | |
schemaBaseUris, | |
streamConfigUri, | |
httpClientRoutes | |
) | |
// This a Dynamic Table of the mediawiki.revision-create stream. It has all the fields | |
// Of the latest event JSONSchema. | |
val mediawikiRevisionCreateDymanicTable: Table = | |
kafkaEventStreamFactory.table( | |
"mediawiki.page-create", // name of event stream | |
"kafka-jumbo1001.eqiad.wmnet" // list of Kafka brokers | |
"my_consumer_group_name" // Name of your consumer | |
) | |
// Or, if you prefer Datastream, get the DataStream version of the mediawiki.revision-create stream. | |
// Each record in the stream is of type Row, and fields can be accessed by name, e.g. | |
// val rev_id = mediawikiRevisionCreateStream.getFieldAs("rev_id") | |
val mediawikiRevisionCreateStream: DataStream[Row] = | |
kafkaEventStreamFactory.dataStream( | |
"mediawiki.page-create", // name of event stream | |
"kafka-jumbo1001.eqiad.wmnet" // list of Kafka brokers | |
"my_consumer_group_name" // Name of your consumer | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment