## docker-compose.yml (start kafka Broker and Zookeeper)
#docker-compose up -d #detach mode
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on: [zookeeper]
- Producer: Send Data (Spark)
- Consumer: Receive Data (Spark)
- Offset : pointer for ingested data at Consumer Eg:
.option("startingOffsets", "earliest")
- Offset : pointer for ingested data at Consumer Eg:
- Brokers (Server): Recieve , Store and Send Data
- Topic: Data is stored as logs called Topic (/kafka/kafka-logs-xxx)
- Partition (For Concurrency) :Each Partition in stored in different Broker
- Replica(Fault Tolerance): backup of partition created in a different broker called Replica
- Zookeeper: Controls and Monitors Broker (/kafka/config/zookeeper.properties)
- ConsumerGroup
- Each partition can server only 1 consumer or 1 consumer group
- Hence to server more than 1 consumer from the same partition we create a consumer group
-
Write
- Batch - Refer Spark Example
- Stream (kafka-python)
#pip3 install kafka-python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') producer.send('TopicName', b'test') producer.flush() print('Published message') # import json # producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) # producer.send('TopicName', {'foo': 'bar'})
-
Read
- Batch (spark.read.format("kafka")..)
- Can read Stream and Batch Data
- spark.read
- Only Once
- programs stops after all lines are executed
- Stream (spark.readStream.format("kafka")..
- Can read Stream and Batch
- spark.readStream
- Only Once or Continues or ProcessingTime
- Uses start().awaitTermination()
- Program Keeps Running
- Batch (spark.read.format("kafka")..)
-
Default Columns - key,value,topic,partition,offset,timestamp are autoadded by kafka
-
NoSuchMethodError - Kafka is incompatible version
# Docker Exec Broker Console
$docker-compose up -d
$docker ps -a
$docker exec -it <kafka_container_id[broker]> bash
$cd /opt/kafka_<version>/bin
$kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic t
$kafka-topics.sh --describe --topic t --zookeeper zookeeper:2181
$kafka-console-producer.sh --topic t --bootstrap-server localhost:9092
$kafka-console-consumer.sh --topic t --from-beginning --bootstrap-server localhost:9092
Write (Table > JSON) :
$pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2
#spark.set.config("spark.jar.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2")
from pyspark.sql.functions import *
from pyspark.sql.types import *
(spark.createDataFrame([("Ram", 20)]).toDF("name", "age")
.withColumn("value", to_json(struct('name', 'age')))
.withColumn("key", lit("key1"))
.select("key", "value")
.write.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "quickstart")
.save())
df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "quickstart") \.load()
st = StructType([StructField("name", StringType()),StructField("age", IntegerType())])
df = df.withColumn("value", df["value"].cast("string")).withColumn("c2", from_json("value", schema=st))
2b) ContinuousData = ReadStream + Write Stream
df=(spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","container_id:9092")
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest"
.load()
.selectExpr("CAST (value as String)")
.writeStream
.outputMode("update")
.option("checkpointLocation", "checkpoint")
.foreachBatch(lambda df, epoch_id: df.selectExpr("concat(value,'_')").write.format("delta").save("path"))
.start()
.awaitTermination()
)
Pre-Req:
- Create account in https://www.cloudkarafka.com/
- Goto Overview
- Copy HostName (If hostname is "rocket.srvs.cloudkafka.com" copy only "rocket")
- Copy DefaultUserName, Password
- Goto kafka > topic > Copy topicname (Example: gdqt5d3d-default)
- Install Python
- Configure Pyspark
Steps:
- Open terminal
- Run Command (current ver= python 3.10 ,spark 3.2.1)
$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0
host,usn,pass,topic="hostname","usn","pass","topic"
server=f"{host}-01.srvs.cloudkafka.com:9094,{host}-02.srvs.cloudkafka.com:9094,{host}-03.srvs.cloudkafka.com:9094"
from pyspark.sql.functions import *
df=spark.createDataFrame([{"name":"Deepak"}])
(df
.withColumn("value",to_json(struct(df.name)))
.withColumn("key",lit("a"))
.select(["key","value"])
.write
.format("kafka")
.option("kafka.bootstrap.servers", server)
.option("topic", f"{topic}")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
.save())
df=(spark
.read
.format("kafka")
.option("kafka.bootstrap.servers",server)
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
.load())
df.selectExpr("CAST(value AS STRING)").show()
#-------------read as Stream
df=(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",server)
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
.load())
(df
.selectExpr("CAST (value as String)")
.writeStream
.outputMode("update")
.option("checkpointLocation", "checkpoint")
.foreachBatch(lambda df, epoch_id: df.selectExpr("concat(value,'_')").write.format("delta").save("path"))
.start()
.awaitTermination())
https://www.youtube.com/watch?v=f8BDoivhtG4&t=0s https://www.youtube.com/watch?v=heR3I3Wxgro&t=179s https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch