Skip to content

Instantly share code, notes, and snippets.

@whitekid
Last active October 21, 2015 12:02
Show Gist options
  • Save whitekid/36f522caa6ff95ae0054 to your computer and use it in GitHub Desktop.
Save whitekid/36f522caa6ff95ae0054 to your computer and use it in GitHub Desktop.
Just simple topic forward with flink
// tested flink 0.9.1
object Job {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
var bootstrap_servers = "<kafka-server>:9092"
properties.setProperty("bootstrap.servers", bootstrap_servers)
properties.setProperty("zookeeper.connect", "<zookeeper-server>:2181")
properties.setProperty("group.id", "<your group id>")
var topic_source = "<sourct-topic>"
var topic_sink = "<target-topic>";
var stream = env
.addSource(new FlinkKafkaConsumer082(topic_source,
new SimpleStringSchema,
properties))
stream.addSink(new KafkaSink(bootstrap_servers,
topic_sink, new SimpleStringSchema))
// Just forward source to sink
stream.forward
// execute program
env.execute("Simple Topic Foward")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment