-
-
Save idris75/7f602474c594f19d1971cd1ad8f069ad to your computer and use it in GitHub Desktop.
kafka notes concepts points to remember
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-------------kafka notes----------- | |
why? | |
better throughput | |
Replication | |
built-in partitioning | |
Fault tolerance | |
topics are unique!! | |
location of a message -> topic - partition - offset | |
more than 2 consumers cannot read from same topic - restriction to avoid double reading. | |
max number of consumers in a consumer group is the available partitions of a topic. | |
Replication is set at the topic level | |
Broker leader talks to producer and consumer - for every partition there is a leader | |
To have more brokers: | |
create 3 config/server.properties | |
in each set: | |
unique broker.id | |
listeners=PLAINTEXT://:9093 -- change the port number if multiple brokers are created in the same machine. | |
log-dir - give unique name if in the same machine | |
important properties: | |
broker.id | |
port | |
log.dirs | |
zookeeper.connect | |
delete.topic.enable | |
auto.create.topics.enable | |
default.replication.factor | |
num.partitions | |
log.retention.ms | |
log.retention.bytes[size based on partition not on topic] | |
Partitioning - default partition by kafka if key value is null or based on key value supplied | |
but can be hardcoded using ProducerRecord(,partition param,timestamp,) | |
if this timestamp not set, broker will set. | |
acks => acknoledgement values 0,1, all | |
0 --> no gurantee if it reached even the leader, highest throughput | |
1 --> reached leader, but not followers yet, slight risk if the leader crashes. | |
all --> reached leader replicated to followers - no risk , slow throughput. | |
max.in.flight.requests.per.connection -- in the asynchronous acknowledgement scenario, how many requests can be sent before reaching acknoledgement | |
will require more memory(more inflight requests) but high throughput | |
side-effect of Asynchronous and retry --> if a batch failed and the next batch sent successfully and the previous batch was retried, eventually the order of data events will be lost. | |
if order is important - use synchronous send & max.in.flight.requests.per.connection=1 | |
other important properties: | |
buffer.memory | |
compression.type | |
batch.size - 0 means no batching | |
linger.ms - if batch size is not reached, will wait this much time-collect and make a request - if batch size reached before this time, request will be made. | |
client.id - beyond host ip and porr - logical app name for example | |
max.request.size | |
Consumer considerations: | |
reading in parallel - | |
only one consumer owns a partition - consumers in a consumer group do not share a partition - hence number of partitions in a topic is the upper limit of the consumers in a group | |
consumer reassignment: while scaling up from one consumer to many or when one of the consumers in the group crashes | |
Rebalance activity: | |
two players: | |
1. server side group coordinator- GC | |
2. Consumer side Leader - L | |
GC->(on the server) manages a list of group members | |
GC--> initiates rebalance activity(block the read for all members) | |
L--> executes rebalance activity and sends new partition assignment to coordinator | |
GC--> communcate the new aasignment to consumers | |
------- | |
offset: | |
current offset - sent records - the position upto which kafka thinks it has given out to the consumer | |
committed offset - processed records - the position already processed by consumer(consumer communicates back once successfully received) | |
is critical in the event of rebalancing - for the new consumer assigned to the partition to know where to start | |
committing offset: | |
auto - properties - enable.auto.commit, auto.commit.interval.ms(default 5 sec), but may cause second processing | |
manual; | |
commit sync | |
commit async | |
manual commit scenario: | |
process <within poll interval) takes longer. | |
1.GS considers this consumer is dead, triggers rebalance activity. | |
2.Rebalancing happens for some other reason | |
current partitions will be taken away and given to others - how about committing the offset upto which process has completed. | |
Commit before the ownership is taken away - using Rebalance listener - committing intermediary processed offsets instead of commiting the current offset. | |
Rebalance Listener class has 2 methods: | |
1.onPartitionsRevoked(happens just before partitions are revoked - this is where we commit) | |
ConsumerRebalanceListener: | |
maintain a list of offset that are processed but not committed yet | |
commit when partitions revoked. | |
2.onPartitionsAssigned | |
Rebalance occurs(new consumer added, poll is delayed as processing takes longer, other system failures) | |
Create rebalanceListener using ConsumerRebalanceListener interface- | |
maintain an offset and commit when onPartitionsRevoked method. | |
Advantages of Consumer Group: | |
parallel processing of topic | |
auto management of partition assignment | |
detect entry/exit/failure of a consumer and partition rebalancing activity take care | |
one downside is -- partition management is done by Kafka so no control on that if we have custom partitioner for example. | |
if you want to process one portion of data differently from others. | |
one issue with Rebalance listener: | |
if the process inside poll is to load data into db and it's loaded and committed. | |
the consumer crashes before the offsets are committed during onPartitionsRevoked method. | |
These two activities are not 'atomic' so db can't be rolledback. | |
To make it atomic: | |
assign the relevant partition to some consumer, | |
store the messages in a DB table and store their corresponding offset also in a table and make both as a 'transaction'. | |
Schema Evolution: | |
if data structure might change over time, we should be able to support both old and new schema. | |
working with Avro Serialization: | |
create a Avro schema for record | |
generate Avro source code for that schema | |
create producer and consumer using KafkaAvroSerializer, kafkaAvroDeserializer | |
Confluent maintains SchemaRegistry, stores an Id and schema and the id is passed by Producer with data and consumer uses the id to get schema info from registry. | |
Avro: | |
allows to define schema for your data | |
creates code for the schema(optional) | |
provide APIs to embed, extract schema and to serialize and deserialize the data. | |
Avro schema defined by Json. | |
--------------------deleting kafka topics---------- | |
use run_class and Topicdelete command | |
prerequisite is delete.topic.enable = true | |
if the server is done while deleting the topic, topic will be shown in the list, but when server is started, the log folders will be deleted and the topic will permanantly be deleted. | |
if ther server is running while deletion, the list will not show the deleted topics. | |
kafka-run-class.sh kafka.admin.TopicCommand --delete --topic ftoKafka_topic --zookeeper localhost:2181 | |
Sreaming App steps: | |
start kafka if not already running. | |
create kafka topic | |
generate logs | |
start flume agent whose source is log data and sink is kafka | |
run the scala jar that will do the analysis | |
---- | |
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /home/cloudera/Downloads/kafka_2.10-0.10.2.1/config/server.properties | |
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic ftokafkaTopicNew | |
/opt/gen_logs/start_logs.sh | |
flume-ng agent -n ftokafka -c /home/cloudera/flume_script -f /home/cloudera/flume_script/ftokafka.conf | |
spark-submit --class FlumeToKafkaToSpark --master local[2] --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar" /home/cloudera/ftokafkatospark_2.10-1.0.jar | |
to debug. | |
flume-ng agent -Dflume.root.logger=INFO,console -n ftokafka -c /home/cloudera/flume_script -f ftokafka.conf | |
spark-shell --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar" | |
retail_2.10-1.0.jar /user/dgadiraju/streaming/streamingdepartmentanalysis | |
sudo ssh 127200813data00.eap.g4ihos.itcs.hpecorp.net | |
cd /usr/hdp/2.5.0.0-1245/kafka/bin/ | |
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ftokafkaTopicNew | |
./kafka-topics.sh --list --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181 | |
./kafka-console-producer.sh --broker-list 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --topic test | |
/home/cloudera/Downloads/kafka_2.10-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181 --topic ftokafkaTopicNew --from-beginning | |
countByDept.foreachRDD { rdd => | |
rdd.foreach { record => | |
println(record._1) | |
println(record._2) | |
} } | |
-------------- | |
./kafka-topics.sh --create --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --replication-factor 1 --partitions 1 --topic test1 | |
./kafka-console-producer.sh --broker-list 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --topic test1 | |
./kafka-console-consumer.sh --bootstrap-server 127200813data00.eap.g4ihos.itcs.hpecorp.net:9092 --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --topic test1 --from-beginning | |
kafka-topics.sh --describe --zookeeper 127200813data02.eap.g4ihos.itcs.hpecorp.net:2181 --topic tes1 | |
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper 127200813master.eap.g4ihos.itcs.hpecorp.net:2181,127200813data02.eap.g4ihos.itcs.hpecorp.net:2181,127200813data01.eap.g4ihos.itcs.hpecorp.net:2181,127200813data00.eap.g4ihos.itcs.hpecorp.net:2181 | |
spark-shell jar kafka-spark-consumer-1.0.10.jar | |
------------------concepts-------------------- | |
important criteria - Fault tolerance, data consistency, simpler API, lower end to end latency | |
Durable - exactly once - replay any message or set of messages given the neccessary selection criteria. | |
Reliable - at least once - able to replay an already received message - JMS, RabbitMQ, Amazon kenesis | |
Spark streaming - high performance!! due to RDD microbatching | |
apache storm supported by hortonworks and mapR only | |
spark streaming hss higher API, similar to spark batch, sql support | |
partitions - number of partitions per topic is important -- can't have more consumers than partitions or else same partition data will be consumed by more than one consumer. | |
Rebalancing: happens when consumers join or leave a group, when failures happen. | |
-----------parallelize inputDstreamsms----------------- | |
multiple InputDStreams | |
val numInputDStreams = 5 | |
val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) } | |
-- | |
multiple threads in one inputDstream | |
val consumerThreadsPerInputDstream = 3 | |
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream) | |
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...) | |
---------------kafka fraud receive data and respond synchronously??-------------- | |
------------------reading http request parameters------------------- | |
protected void doGet( | |
HttpServletRequest request, | |
HttpServletResponse response) | |
throws ServletException, IOException { | |
String param1 = request.getParameter("param1"); | |
String param2 = request.getParameter("param2"); | |
} | |
------------------rebalance----------------------------- | |
when consumer threads change dynamically(user intiated or due to system failures) | |
------------flume to hdfs --------------------- | |
--ftohdfs.conf-- | |
ftohdfs.sources = logsource | |
ftohdfs.sinks = hdfssink | |
ftohdfs.channels = mchannel | |
# Describe/configure the source | |
ftohdfs.sources.logsource.type = exec | |
ftohdfs.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log | |
# Describe the sink | |
ftohdfs.sinks.hdfssink.type = hdfs | |
ftohdfs.sinks.hdfssink.hdfs.path = hdfs://quickstart.cloudera:8020/user/cloudera/flume_data | |
ftohdfs.sinks.hdfssink.hdfs.fileType = DataStream | |
# Use a channel which buffers events in memory | |
ftohdfs.channels.mchannel.type = memory | |
ftohdfs.channels.mchannel.capacity = 1000 | |
ftohdfs.channels.mchannel.transactionCapacity = 100 | |
# Bind the source and sink to the channel | |
ftohdfs.sources.logsource.channels = mchannel | |
ftohdfs.sinks.hdfssink.channel = mchannel | |
---- | |
start the log gen - /opt/gen_logs/start_logs.sh | |
flume-ng agent -n ftohdfs -c . -f ftohdfs.conf | |
---------------flume to kafka-------------- | |
install kafka: https://archive.cloudera.com/kafka/kafka/2/kafka/quickstart.html | |
start zookeeper if not started(one comes with kafka): | |
bin/zookeeper-server-start.sh config/zookeeper.properties | |
start kafka server: | |
bin/kafka-server-start.sh config/server.properties | |
create topic: | |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ftoKafka_topic | |
bin/kafka-topics.sh --list --zookeeper localhost:2181 | |
ftokafka.conf | |
ftokafka.sources = logsource | |
ftokafka.sinks = kafkasink | |
ftokafka.channels = mchannel | |
# Describe/configure the source | |
ftokafka.sources.logsource.type = exec | |
ftokafka.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log | |
# Describe the sink | |
ftokafka.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink | |
ftokafka.sinks.kafkasink.brokerList = localhost:9092 | |
ftokafka.sinks.kafkasink.topic = ftokafka_topic | |
# Use a channel which buffers events in memory | |
ftokafka.channels.mchannel.type = memory | |
ftokafka.channels.mchannel.capacity = 1000 | |
ftokafka.channels.mchannel.transactionCapacity = 100 | |
# Bind the source and sink to the channel | |
ftokafka.sources.logsource.channels = mchannel | |
ftokafka.sinks.kafkasink.channel = mchannel | |
Start flume agent that moves data from log files to kafka sink | |
flume-ng agent -n ftokafka -c . -f ftokafka.conf | |
consume on the console: | |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic ftokafkaTopicNew --from-beginning | |
--------------------kafka in cloudera-------------- | |
bin/zookeeper-server-start.sh config/zookeeper.properties | |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test | |
bin/kafka-topics.sh --list | |
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ftokafka_topic | |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic ftokafka_topic --from-beginning | |
topic creation -- zookeeper | |
producer -- broker list | |
consumer -- bootstrap-server, zookeeper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment