Skip to content

Instantly share code, notes, and snippets.

@DmitryBe
Last active November 30, 2021 04:06
Show Gist options
  • Save DmitryBe/bc7a84b42600652417636a35d191a9dd to your computer and use it in GitHub Desktop.
Save DmitryBe/bc7a84b42600652417636a35d191a9dd to your computer and use it in GitHub Desktop.
Kafka stream consumers
# https://pypi.python.org/pypi/kafka-python
from kafka import TopicPartition
from kafka import KafkaConsumer
topic_name = 'test-01'
group_id = 'test-consumer-01'
KAFKA_BROKER_LIST="10.2.65.197:9092,10.2.67.55:9092,10.2.91.23:9092"
# option 1
consumer = KafkaConsumer(topic_name,
group_id=group_id,
bootstrap_servers=KAFKA_BROKER_LIST,
auto_offset_reset='earliest')
# option 2 (assign partitions)
consumer = KafkaConsumer(bootstrap_servers=KAFKA_BROKER_LIST)
partitions = [TopicPartition(topic_name, p) for p in consumer.partitions_for_topic(topic_name)]
consumer.assign(partitions)
# get assigned partitions
assigned_partitions = consumer.assignment()
assert(assigned_partitions.__len__() == len(partitions))
# set offset
offset = 40
for p in consumer.partitions_for_topic(topic_name):
consumer.seek(TopicPartition(topic_name, p), offset)
# read messages
for msg in consumer:
print(msg)
// --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.streaming._
// create spark stream context: https://spark.apache.org/docs/latest/streaming-programming-guide.html
val ssc = new StreamingContext(sc, Seconds(1))
// kafka brokers && topic
val kafkaBrokers = "10.2.65.197:9092,10.2.67.55:9092,10.2.91.23:9092"
val topic = "test-01"
val topicOffset = "earliest" // latest
// kafka consumer params
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaBrokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"${java.util.UUID.randomUUID().toString}",
"auto.offset.reset" -> topicOffset
)
// stream definition
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](Seq(topic), kafkaParams)
)
// function
stream.foreachRDD(rdd => {
val data = rdd.collect().mkString("\n")
println(data)
})
// start stream
ssc.start()
ssc.awaitTermination()
SPARK_IMAGE="docker-dev.hli.io/ccm/hli-rspark-plink:2.0.1"
SPARK_MASTER="mesos://10.2.95.5:5050"
ROOT_DIR="$(cd "$(dirname "$0")"; pwd)"
CORES=32
RAM="50g"
docker run -it --rm \
-e SPARK_MASTER=${SPARK_MASTER} \
-e SPARK_IMAGE=${SPARK_IMAGE} \
--net=host \
${SPARK_IMAGE} sh -c "/usr/local/spark/bin/spark-shell \
--driver-memory=1g \
--conf spark.executor.memory=${RAM} \
--conf spark.driver.maxResultSize=512g \
--conf spark.mesos.coarse=true \
--conf spark.task.maxFailures=10 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter \
--conf spark.shuffle.spill=true \
--conf spark.mesos.executor.docker.image=${SPARK_IMAGE} \
--conf spark.cores.max=${CORES} \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment