Skip to content

Instantly share code, notes, and snippets.

@vinodkc
Last active June 9, 2021 10:56
Show Gist options
  • Save vinodkc/b727abac5116fea62d5e76fad3761d23 to your computer and use it in GitHub Desktop.
Save vinodkc/b727abac5116fea62d5e76fad3761d23 to your computer and use it in GitHub Desktop.
cd /usr/hdp/current/kafka-broker/bin/
[kafka@c220-node2 bin]$ ./kafka-topics.sh --create --zookeeper c220-node2.squadron-labs.com:2181 --replication-factor 2 --partitions 3 --topic source1
Created topic "source1".
[kafka@c220-node2 bin]$ ./kafka-topics.sh --create --zookeeper c220-node2.squadron-labs.com:2181 --replication-factor 2 --partitions 3 --topic dest1
Created topic "dest1".
[kafka@c220-node2 bin]$ ./kafka-console-producer.sh --broker-list c220-node2.squadron-labs.com:6667 --topic source1
[kafka@c220-node4 bin]$ ./kafka-console-consumer.sh --bootstrap-server c220-node4.squadron-labs.com:6667 --topic dest1
I have used org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 to match my hdp 2.6.5 cluster. You have to use your the version matching your cluster
Test 1:
-------------
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("subscribe", "source1").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].groupBy("value").count().writeStream.outputMode("complete").format("console").start()
Note :
In secure cluster pass secutiy protocol in option(...)
Test 2:
-----------------
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server c220-node4.squadron-labs.com:6667 --topic dest1
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions._
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("subscribe", "source1").load().selectExpr("upper(CAST(value AS STRING))").as[String].flatMap(_.split(" ")).writeStream.outputMode("update").format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("topic", "dest1").option("checkpointLocation", "/tmp/checkpount3").start()
Note: result will be shown in topic dest1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment