Last active
September 19, 2020 15:45
-
-
Save vinodkc/f60578358ca55455c10fb72a19ec4b89 to your computer and use it in GitHub Desktop.
HDP3 - Spark structured streaming Kafka integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A) Spark structured streaming Kafka integration - SASL_PLAINTEXT | |
1) Prerequisites | |
[consumer-user@c220-node1 sslss]$ ll | |
-rw------- 1 consumer-user root 144 Apr 21 08:56 consumer-user.keytab | |
-rw-rw-r-- 1 consumer-user consumer-user 229 Apr 21 09:40 kafka_client_jaas.conf | |
[consumer-user@c220-node1 sslss]$ cat kafka_client_jaas.conf | |
KafkaClient { | |
com.sun.security.auth.module.Krb5LoginModule required | |
doNotPrompt=true | |
useTicketCache=false | |
principal="[email protected]" | |
useKeyTab=true | |
serviceName="kafka" | |
keyTab="consumer-user.keytab" | |
client=true; | |
}; | |
2) Start spark-shell [Kafka SASL_PLAINTEXT port - 6667] | |
cd ~/sslss | |
/usr/hdp/current/spark2-client/bin/spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.driver.extraJavaOptions=" -Djava.security.auth.login.config=kafka_client_jaas.conf" --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client_jaas.conf " --files ./kafka_client_jaas.conf,./consumer-user.keytab | |
3) Paste the code | |
spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node1.squadron-labs.com:6667").option("subscribe", "test1").option("kafka.security.protocol", "SASL_PLAINTEXT").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].writeStream.outputMode("update").format("console").start() | |
4) From another host/terminal, run kafka-console-producer.sh and send test messages : | |
kinit -kt /etc/security/keytabs/producer-user.keytab producer-user | |
[root@c220-node2 bin]# ./kafka-console-producer.sh --broker-list c220-node3.squadron-labs.com:6668 --producer.config ~/sslnew/client-ssl.properties --topic test1 | |
5) Sample output from Spark shell | |
------------------------------------------- | |
Batch: 3 | |
------------------------------------------- | |
+----+-----+ | |
| key|value| | |
+----+-----+ | |
|null| fs| | |
+----+-----+ | |
------------------------------------------- | |
Batch: 4 | |
------------------------------------------- | |
+----+-----+ | |
| key|value| | |
+----+-----+ | |
|null| ffs| | |
B) Spark structured streaming Kafka integration - SASL_SSL | |
1) 1) Prerequisites | |
[consumer-user@c220-node1 sslss]$ ll | |
-rw-r--r-- 1 root root 997 Apr 21 08:54 client.truststore.jks | |
-rw------- 1 consumer-user root 144 Apr 21 08:56 consumer-user.keytab | |
-rw-rw-r-- 1 consumer-user consumer-user 229 Apr 21 09:40 kafka_client_jaas.conf | |
[consumer-user@c220-node1 sslss]$ cat kafka_client_jaas.conf | |
KafkaClient { | |
com.sun.security.auth.module.Krb5LoginModule required | |
doNotPrompt=true | |
useTicketCache=false | |
principal="[email protected]" | |
useKeyTab=true | |
serviceName="kafka" | |
keyTab="consumer-user.keytab" | |
client=true; | |
}; | |
2) Start spark-shell [kafka SASL_SSL port - 6668] | |
cd ~/sslss | |
/usr/hdp/current/spark2-client/bin/spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.driver.extraJavaOptions=" -Djava.security.auth.login.config=kafka_client_jaas.conf" --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client_jaas.conf " --files ./kafka_client_jaas.conf,./consumer-user.keytab,./client.truststore.jks | |
3) Paste the code | |
spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node1.squadron-labs.com:6668").option("subscribe", "test1").option("kafka.security.protocol", "SASL_SSL").option("kafka.ssl.truststore.location","client.truststore.jks").option("kafka.ssl.truststore.password","yourpassword").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].writeStream.outputMode("update").format("console").start() | |
4) From another host/terminal, run kafka-console-producer.sh and send test messages : | |
kinit -kt /etc/security/keytabs/producer-user.keytab producer-user | |
[root@c220-node2 bin]# ./kafka-console-producer.sh --broker-list c220-node3.squadron-labs.com:6668 --producer.config ~/sslnew/client-ssl.properties --topic test1 | |
>SSL TEST | |
5) Sample output from Spark shell | |
------------------------------------------- | |
Batch: 2 | |
------------------------------------------- | |
+----+--------+ | |
| key| value| | |
+----+--------+ | |
|null|SSL TEST| | |
+----+--------+ | |
Happy Coding :) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment