Skip to content

Instantly share code, notes, and snippets.

@khajavi
Created May 15, 2017 08:19
Show Gist options
  • Save khajavi/ec9ad6f9dd9d2ceda7e01cbc3d9aeac8 to your computer and use it in GitHub Desktop.
Save khajavi/ec9ad6f9dd9d2ceda7e01cbc3d9aeac8 to your computer and use it in GitHub Desktop.
/**
* @author Milad Khajavi <[email protected]>.
*/
object FlinkWindowStreaming extends App {
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost", 4444)
val counts: DataStream[(String, String)] =
socTextStream
.map {
r =>
println(r)
("1", r)
}
.keyBy(0)
.countWindow(5, 1)
.reduce {
(x, y) => (x._1, x._2 + y._2)
}
.setParallelism(4)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
val sink = new FlinkKafkaProducer[(String, String)]("test", new TupleSchema, properties)
counts.addSink(sink)
counts.print()
sev.execute()
class TupleSchema extends DeserializationSchema[(String, String)] with SerializationSchema[(String, String), Array[Byte]] {
override def isEndOfStream(nextElement: (String, String)): Boolean = false
override def deserialize(message: Array[Byte]): (String, String) = {
val str = new String(message)
println((str.split(";")(0), str.split(";")(1)))
(str.split(";")(0), str.split(";")(1))
}
override def getProducedType: TypeInformation[(String, String)] = TypeInfoParser.parse("Tuple2[String, String]")
override def serialize(element: (String, String)): Array[Byte] = (element._1 + ";" + element._2).getBytes
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment