Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active June 7, 2022 13:42
Show Gist options
  • Save ottomata/bfc00d82e30299bea8b92301c7f23108 to your computer and use it in GitHub Desktop.
Save ottomata/bfc00d82e30299bea8b92301c7f23108 to your computer and use it in GitHub Desktop.
Event Platform Flink Integration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// Write case clases (or Java Pojos) for every event schema out there.
// There are at least 117 of them!
case class Meta(
uri: String,
request_id: String,
id: String,
dt: String,
domain: String,
stream: String
)
case class Performer(
user_text: Option[String] = None,
user_groups: Option[List[String]] = None,
user_is_bot: Option[Boolean] = None,
user_id: Option[Long] = None,
user_registration_dt: Option[String] = None,
user_edit_count: Option[Long] = None
)
case class PageCreate(meta: Meta,
database: String,
page_id: Long,
page_title: String,
page_namespace: Int,
rev_id: Long,
rev_timestamp: String,
rev_sha1: String,
rev_minor_edit: Boolean,
rev_len: Int,
rev_content_model: String,
rev_content_format: String,
performer: Option[Performer] = None,
page_is_redirect: Boolean,
comment: Option[String],
parsedcomment: Option[String]
)
// This is just an implicit to allow for automatic JSON desiarlization into the PageCreate class.
implicit val config: Configuration = Configuration.default.withDefaults
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", "kafka-jumbo1001.eqiad.wmnet") // list of Kafka brokers
kafkaProperties.setProperty("group.id", "my_consumer_group_name") // Name of your consumer
val PageCreateSource: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String](
s"eqiad.mediawiki.page-create;codfw.mediawiki.page-create",
new SimpleStringSchema(),
KafkaConfig.properties
)
// Finally! A Flink DataStream[PageCreate].
val pageCreateStream: DataStream[ActionQuery] = env.addSource(PageCreateSource)
.flatMap({ message: String =>
decode[PageCreate](message) match {
case Left(error) =>
Log.error(error.toString)
None
case Right(pageCreate: PageCreate) => Option(pageCreate)
}
})
.map((pageCreate: PageCreate) => { pageCreate })
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
// Uses EventTableDescriptorBuilder plus some Kafka configs.
// https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/table/EventTableDescriptorBuilder.java
// https://gitlab.wikimedia.org/repos/data-engineering/mediawiki-stream-enrichment/-/blob/T307959-event-table-source/src/main/scala/org/wikimedia/dataplatform/KafkaEventStreamFactory.scala
import org.wikimedia.dataplatform.KafkaEventStreamFactory
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val streamConfigUri = "https://meta.wikimedia.org/w/api.php"
val schemaBaseUris = Seq(
"https://schema.discovery.wmnet/repositories/primary/jsonschema",
"https://schema.discovery.wmnet/repositories/secondary/jsonschema"
)
val httpClientRoutes = Map(
"https://meta.wikimedia.org" -> "https://api-ro.discovery.wmnet"
)
// Combines Kafka config, EventStreamConfig and Schema Repos
// to automate instantiation of Event Platform streams from Kafka.
val kafkaEventStreamFactory: KafkaEventStreamFactory = KafkaEventStreamFactory(
tableEnv, // StreamTableEnvironment
schemaBaseUris,
streamConfigUri,
httpClientRoutes
)
// This a Dynamic Table of the mediawiki.revision-create stream. It has all the fields
// Of the latest event JSONSchema.
val mediawikiRevisionCreateDymanicTable: Table =
kafkaEventStreamFactory.table(
"mediawiki.page-create", // name of event stream
"kafka-jumbo1001.eqiad.wmnet" // list of Kafka brokers
"my_consumer_group_name" // Name of your consumer
)
// Or, if you prefer Datastream, get the DataStream version of the mediawiki.revision-create stream.
// Each record in the stream is of type Row, and fields can be accessed by name, e.g.
// val rev_id = mediawikiRevisionCreateStream.getFieldAs("rev_id")
val mediawikiRevisionCreateStream: DataStream[Row] =
kafkaEventStreamFactory.dataStream(
"mediawiki.page-create", // name of event stream
"kafka-jumbo1001.eqiad.wmnet" // list of Kafka brokers
"my_consumer_group_name" // Name of your consumer
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment