Skip to content

Instantly share code, notes, and snippets.

@j-thepac
Last active November 18, 2024 00:03
Show Gist options
  • Save j-thepac/58192f87b82f79cc27729cd6645678fd to your computer and use it in GitHub Desktop.
Save j-thepac/58192f87b82f79cc27729cd6645678fd to your computer and use it in GitHub Desktop.

Kafka

kafkaSample

Docker YML

## docker-compose.yml (start kafka Broker and Zookeeper)

    #docker-compose up -d #detach mode
	version: '3'
	services:
	  zookeeper:
	    image: wurstmeister/zookeeper
	    container_name: zookeeper
	    ports:
	      - "2181:2181"

	  kafka:
	    image: wurstmeister/kafka
	    container_name: kafka
	    ports:
	      - "9092:9092"
	    environment:
	      KAFKA_ADVERTISED_HOST_NAME: localhost
	      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
	    depends_on: [zookeeper]

Different Parts

  • Producer: Send Data (Spark)
  • Consumer: Receive Data (Spark)
    • Offset : pointer for ingested data at Consumer Eg: .option("startingOffsets", "earliest")
  • Brokers (Server): Recieve , Store and Send Data
    • Topic: Data is stored as logs called Topic (/kafka/kafka-logs-xxx)
    • Partition (For Concurrency) :Each Partition in stored in different Broker
    • Replica(Fault Tolerance): backup of partition created in a different broker called Replica
  • Zookeeper: Controls and Monitors Broker (/kafka/config/zookeeper.properties)
  • ConsumerGroup
    • Each partition can server only 1 consumer or 1 consumer group
    • Hence to server more than 1 consumer from the same partition we create a consumer group

Operation

  1. Write

    • Batch - Refer Spark Example
    • Stream (kafka-python)
    #pip3 install kafka-python
    from  kafka  import  KafkaProducer
    producer  =  KafkaProducer(bootstrap_servers='localhost:9092')
    producer.send('TopicName', b'test')
    producer.flush()
    print('Published message')
    # import json
    # producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    # producer.send('TopicName', {'foo': 'bar'})
    
  2. Read

    1. Batch (spark.read.format("kafka")..)
      • Can read Stream and Batch Data
      • spark.read
      • Only Once
      • programs stops after all lines are executed
    2. Stream (spark.readStream.format("kafka")..
      • Can read Stream and Batch
      • spark.readStream
      • Only Once or Continues or ProcessingTime
      • Uses start().awaitTermination()
      • Program Keeps Running
  3. Default Columns - key,value,topic,partition,offset,timestamp are autoadded by kafka

  4. NoSuchMethodError - Kafka is incompatible version

Manual Run

# Docker Exec Broker Console
$docker-compose up -d
$docker ps -a
$docker exec -it <kafka_container_id[broker]> bash
$cd /opt/kafka_<version>/bin 
$kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic t
$kafka-topics.sh --describe --topic t --zookeeper zookeeper:2181
$kafka-console-producer.sh --topic t --bootstrap-server localhost:9092
$kafka-console-consumer.sh --topic t --from-beginning --bootstrap-server localhost:9092

Spark Operation (Syntax Similar to Spark Streaming)

Write (Table > JSON) :

$pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2
#spark.set.config("spark.jar.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2")

from pyspark.sql.functions import * 
from pyspark.sql.types import *

(spark.createDataFrame([("Ram", 20)]).toDF("name", "age")
	.withColumn("value", to_json(struct('name', 'age'))) 
	.withColumn("key", lit("key1")) 
	.select("key", "value")
	.write.format("kafka")
	.option("kafka.bootstrap.servers", "localhost:9092")
	.option("topic", "quickstart")
	.save())

df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "quickstart") \.load()
st = StructType([StructField("name", StringType()),StructField("age", IntegerType())])
df = df.withColumn("value", df["value"].cast("string")).withColumn("c2", from_json("value", schema=st))

2b) ContinuousData = ReadStream + Write Stream

df=(spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","container_id:9092")
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest"
.load()
.selectExpr("CAST (value as String)")
.writeStream
.outputMode("update")
.option("checkpointLocation", "checkpoint")
.foreachBatch(lambda df, epoch_id: df.selectExpr("concat(value,'_')").write.format("delta").save("path"))
.start()
.awaitTermination()
)

Cloud Kafka

Pre-Req:

  • Create account in https://www.cloudkarafka.com/
  • Goto Overview
  • Copy HostName (If hostname is "rocket.srvs.cloudkafka.com" copy only "rocket")
  • Copy DefaultUserName, Password
  • Goto kafka > topic > Copy topicname (Example: gdqt5d3d-default)
  • Install Python
  • Configure Pyspark

Steps:

  1. Open terminal
  2. Run Command (current ver= python 3.10 ,spark 3.2.1)

Write

$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0
host,usn,pass,topic="hostname","usn","pass","topic"
server=f"{host}-01.srvs.cloudkafka.com:9094,{host}-02.srvs.cloudkafka.com:9094,{host}-03.srvs.cloudkafka.com:9094"

from pyspark.sql.functions import *
df=spark.createDataFrame([{"name":"Deepak"}])
(df
.withColumn("value",to_json(struct(df.name)))
.withColumn("key",lit("a"))
.select(["key","value"])
.write
.format("kafka")
.option("kafka.bootstrap.servers", server)
.option("topic", f"{topic}")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
.save())

Read

df=(spark
.read
.format("kafka")
 .option("kafka.bootstrap.servers",server)
.option("subscribe", f"{topic}")
.option("startingOffsets", "earliest")
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
 .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
 .load())
 df.selectExpr("CAST(value AS STRING)").show() 

#-------------read as Stream
df=(spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers",server)
 .option("subscribe", f"{topic}")
 .option("startingOffsets", "earliest")
 .option("kafka.security.protocol","SASL_SSL")
 .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
 .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username='{usn}' password='{password}';")
 .load())

(df
.selectExpr("CAST (value as String)")
.writeStream
.outputMode("update")
.option("checkpointLocation", "checkpoint")
.foreachBatch(lambda df, epoch_id: df.selectExpr("concat(value,'_')").write.format("delta").save("path"))
.start()
.awaitTermination())

Links

https://www.youtube.com/watch?v=f8BDoivhtG4&t=0s https://www.youtube.com/watch?v=heR3I3Wxgro&t=179s https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment