Skip to content

Instantly share code, notes, and snippets.

@hzbd
Last active March 28, 2021 13:17
Show Gist options
  • Save hzbd/a1b80a918fea56128d97831bbcf1e858 to your computer and use it in GitHub Desktop.
Save hzbd/a1b80a918fea56128d97831bbcf1e858 to your computer and use it in GitHub Desktop.
kafka python3 handler kit.(writer&reader&admin)
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 六二三
# Licensed under the MIT License.
# All rights reserved.
#
# Deps:
# pip3 install kafka-python==2.0.2
#
#
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError, TopicAlreadyExistsError, UnknownTopicOrPartitionError
from kafka import KafkaAdminClient
from kafka import OffsetAndMetadata
from kafka.structs import TopicPartition
from kafka.admin import NewTopic
import json
import logging
logger = logging.getLogger(__name__)
class KafkaHandler(object):
""" Kafka Handler"""
def __init__(self, bootstrap_servers='localhost:9092'):
self.kafka_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
def create_topic(self, topic_name, partitions=1, replication_factor=1):
try:
topic = NewTopic(name=topic_name, num_partitions=partitions, replication_factor=replication_factor)
self.kafka_admin.create_topics([topic])
return True
except (TopicAlreadyExistsError, Exception):
logger.error("create kafka topic({}) failed".format(topic_name))
return False
def delete_topic(self, topic_name):
try:
self.kafka_admin.delete_topics([topic_name])
return True
except (UnknownTopicOrPartitionError, Exception):
logger.error("delete kafka topic({}) failed".format(topic_name))
return False
def close(self):
self.kafka_admin.close()
class KafkaWriter(object):
""" Kafka Producer"""
def __init__(self, bootstrap_servers=['localhost:9092']):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
retries=1,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
compression_type='gzip')
def send(self, topic, message={}):
try:
self.producer.send(topic, message)
# print(self.producer.metrics())
except KafkaTimeoutError as timeout_error:
print(timeout_error.message)
except KafkaError as kafka_error:
print(kafka_error.message)
except Exception as error:
print(error)
def close(self):
self.producer.close()
class KafkaReader(object):
""" Kafka Consumer"""
def __init__(
self,
bootstrap_servers=['localhost:9092'],
timeout=1000,
group_id="kafka_reader"):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf8')),
consumer_timeout_ms=timeout,
auto_offset_reset='earliest',
enable_auto_commit=True,
max_poll_records=10)
def topic_list(self):
return self.consumer.topics()
def start(self, topic="test-topic", limit=100):
message_list = []
self.consumer.subscribe([topic])
# self.consumer.subscribe(pattern='^awesome.*')
partitions = self.consumer.partitions_for_topic(topic)
if not partitions:
return
for partition in partitions:
topic_partition = TopicPartition(topic=topic, partition=partition)
# current_offset = self.consumer.end_offsets([topic_partition])[topic_partition]
# self.consumer.seek(partition=topic_partition, offset=current_offset)
messages = self.consumer.poll(timeout_ms=1000, max_records=limit)
for _, messages in messages.items():
for message in messages:
logger.debug("{} | {} | {}".format(message.topic, message.value, message.offset))
message_list.append(
{
"topic": message.topic,
"value": message.value,
"offset": message.offset
}
)
self.consumer.commit(offsets={topic_partition: (OffsetAndMetadata(message.offset, None))})
self.consumer.close()
return message_list
if __name__ == '__main__':
# kh = KafkaHandler()
# kh.create_topic("test-4", partitions=4)
# worker = KafkaReader()
# print(worker.topic_list())
topic = "demo-130"
# for i in range(50):
# print("sending...")
# json_data = {'foo': 'bar', 'gooood': i}
# kw = KafkaWriter()
# kw.send(topic, json_data)
# kw.close()
kr = KafkaReader()
ret = kr.start(topic=topic, limit=10)
for v in ret:
print(v)
@hzbd
Copy link
Author

hzbd commented Mar 28, 2021

How to quickly start a local Kafka cluster

$ sudo apt-get update -y
$ sudo apt i default-jdk
$ java --version
$ whereis java

$ wget -c https://downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz 
$ tar xf kafka_2.12-2.7.0.tgz -C /usr/local/
$ mv kafka_2.12-2.7.0 /usr/local/
$ mkdir -pv /var/log/kafka
$ cd /usr/local/kafka/config
$ cp server.properties{,.bak}

$ tee server.properties << EOF
broker.id=0
listeners=PLAINTEXT://172.31.9.144:9092
num.network.threads=6
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=1000
log.flush.interval.ms=1000
log.retention.hours=48
log.retention.bytes=107374184
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.31.9.144:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
EOF

$ sudo tee /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
EOF

$ sudo tee /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
EOF

$ sudo systemctl daemon-reload
$ sudo systemctl restart zookeeper
$ sudo systemctl restart kafka

Starting CMAK service

$ docker run -d \
    --net=host \
    --name kafka-manager \
    -e ZK_HOSTS="localhost:2181" \
    hlebalbau/kafka-manager:stable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment