Skip to content

Instantly share code, notes, and snippets.

@kkirsanov
Last active January 23, 2017 19:27
Show Gist options
  • Save kkirsanov/688ec6d592ab67b725e0297319c18045 to your computer and use it in GitHub Desktop.
Save kkirsanov/688ec6d592ab67b725e0297319c18045 to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError
import signal
# корректное завршение по сигналам
canGo = True
def doStop(*args, **kwargs):
global canGo
canGo = False
signal.signal(signal.SIGINT, doStop)
signal.signal(signal.SIGTERM, doStop)
# описание - http://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'topic-reader',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['topic'])
while canGo:
msg = c.poll(60) # если за 60 сек не прочитали - ошибка
if not msg.error():
data = msg.value().decode('utf8')
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
canGo = False
elif msg.error().code() == KafkaError._PARTITION_EOF:
running = False
canGo("EMPTY")
c.close()
from confluent_kafka import Producer
import signal
# корректное завршение по сигналам
canGo = True
def doStop(*args, **kwargs):
global canGo
canGo = False
signal.signal(signal.SIGINT, doStop)
signal.signal(signal.SIGTERM, doStop)
# описание - http://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html
p = Producer({'bootstrap.servers': 'localhost:9092', 'api.version.request': True})
cnt = 0
while canGo:
cnt += 1
p.produce('topic', 'asd'.encode('utf8'))
p.flush()
if cnt >= 100:
canGo = False
p.flush()
confluent_consume.py
# coding=utf8
# pip install kafka
from kafka import KafkaConsumer, TopicPartition
import signal
# значения доп-опций - KafkaConsumer.DEFAULT_CONFIG
# прочитайте их что бы знать как можно делать
consumer = KafkaConsumer('topic',
group_id='topic-readers',
bootstrap_servers=['localhost:9092'],
enable_auto_commit=False, # Хотим ручной коммит
)
# корректное завршение по сигналам
canGo = True
def doStop(*args, **kwargs):
global canGo
canGo = False
signal.signal(signal.SIGINT, doStop)
signal.signal(signal.SIGTERM, doStop)
# если подключатся к не актвному топику будет небольшая задержка
for msg in consumer:
# прихождит сообщение:
# msg = ConsumerRecord(topic='topic', partition=0, offset=106, timestamp=1485198079690,
# timestamp_type=0, key=None, value=b'\xd0...',
# checksum=-1296717591, serialized_key_size=-1, serialized_value_size=32)
print(msg)
# синхронно подтверждаем чтение, можно и асинхронно
consumer.commit()
if not canGo:
break
# coding=utf8
# pip install kafka
from kafka import KafkaProducer
import signal
# значения доп-опций - KafkaProducer.DEFAULT_CONFIG
# прочитайте их что бы знать как можно делать
producer = KafkaProducer(bootstrap_servers='localhost:9092')
cnt = 0
canGo = True
# корректное завршение по сигналам
def doStop(*args, **kwargs):
global canGo
canGo = False
signal.signal(signal.SIGINT, doStop)
signal.signal(signal.SIGTERM, doStop)
while canGo:
cnt += 1
# отправляем асинхронно
producer.send('topic', 'Байты-байты-байты'.encode('utf8'))
# точка синхронизации, если упадет до неё, то сообщение может быть не доставлено
# если не вызывать, то проихсодит асинхронно автоматически
producer.flush()
if cnt >= 100:
canGo = False
producer.close() # убеждаемся в успешной отправке и закрываемся
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment