Skip to content

Instantly share code, notes, and snippets.

@virtualsafety
Created May 4, 2017 01:43
Show Gist options
  • Save virtualsafety/dfab16bc98001783463ef296c93fd040 to your computer and use it in GitHub Desktop.
Save virtualsafety/dfab16bc98001783463ef296c93fd040 to your computer and use it in GitHub Desktop.
read json from kafka , parse json to case class , and then convert case class to tuple.
package com.test.flink
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.json4s._
import org.json4s.jackson.JsonMethods._
object AlertActiveSessionTuple {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
properties.setProperty("client.id", "flink_client")
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "10")
val kafkaConsumer = new FlinkKafkaConsumer09[String]("test", new SimpleStringSchema(), properties)
val stream = env.addSource(kafkaConsumer).map { x =>
implicit val formats = DefaultFormats;
ActiveSession.unapply(parse(x).extract[ActiveSession]).get // parse json to case class ,and then convert case class to tuple
} .keyBy(0,1)
.timeWindow(Time.seconds(10))
.sum(2).print()
env.execute("Flink Kafka Example")
}
case class ActiveSession(Oracle_sid: String, Hostname: String,Session_cnt: Int, Cpu_cores: Int,Check_tm: String)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment