Last active
June 9, 2021 10:56
-
-
Save vinodkc/b727abac5116fea62d5e76fad3761d23 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cd /usr/hdp/current/kafka-broker/bin/ | |
[kafka@c220-node2 bin]$ ./kafka-topics.sh --create --zookeeper c220-node2.squadron-labs.com:2181 --replication-factor 2 --partitions 3 --topic source1 | |
Created topic "source1". | |
[kafka@c220-node2 bin]$ ./kafka-topics.sh --create --zookeeper c220-node2.squadron-labs.com:2181 --replication-factor 2 --partitions 3 --topic dest1 | |
Created topic "dest1". | |
[kafka@c220-node2 bin]$ ./kafka-console-producer.sh --broker-list c220-node2.squadron-labs.com:6667 --topic source1 | |
[kafka@c220-node4 bin]$ ./kafka-console-consumer.sh --bootstrap-server c220-node4.squadron-labs.com:6667 --topic dest1 | |
I have used org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 to match my hdp 2.6.5 cluster. You have to use your the version matching your cluster | |
Test 1: | |
------------- | |
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 | |
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("subscribe", "source1").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].groupBy("value").count().writeStream.outputMode("complete").format("console").start() | |
Note : | |
In secure cluster pass secutiy protocol in option(...) | |
Test 2: | |
----------------- | |
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server c220-node4.squadron-labs.com:6667 --topic dest1 | |
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 | |
import org.apache.spark.sql.functions._ | |
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("subscribe", "source1").load().selectExpr("upper(CAST(value AS STRING))").as[String].flatMap(_.split(" ")).writeStream.outputMode("update").format("kafka").option("kafka.bootstrap.servers", "c220-node2.squadron-labs.com:6667").option("topic", "dest1").option("checkpointLocation", "/tmp/checkpount3").start() | |
Note: result will be shown in topic dest1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment