Last active
January 23, 2017 19:27
-
-
Save kkirsanov/688ec6d592ab67b725e0297319c18045 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Consumer, KafkaError | |
import signal | |
# корректное завршение по сигналам | |
canGo = True | |
def doStop(*args, **kwargs): | |
global canGo | |
canGo = False | |
signal.signal(signal.SIGINT, doStop) | |
signal.signal(signal.SIGTERM, doStop) | |
# описание - http://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html | |
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'topic-reader', | |
'default.topic.config': {'auto.offset.reset': 'smallest'}}) | |
c.subscribe(['topic']) | |
while canGo: | |
msg = c.poll(60) # если за 60 сек не прочитали - ошибка | |
if not msg.error(): | |
data = msg.value().decode('utf8') | |
c.commit(msg) | |
elif msg.error().code() != KafkaError._PARTITION_EOF: | |
print(msg.error()) | |
canGo = False | |
elif msg.error().code() == KafkaError._PARTITION_EOF: | |
running = False | |
canGo("EMPTY") | |
c.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Producer | |
import signal | |
# корректное завршение по сигналам | |
canGo = True | |
def doStop(*args, **kwargs): | |
global canGo | |
canGo = False | |
signal.signal(signal.SIGINT, doStop) | |
signal.signal(signal.SIGTERM, doStop) | |
# описание - http://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html | |
p = Producer({'bootstrap.servers': 'localhost:9092', 'api.version.request': True}) | |
cnt = 0 | |
while canGo: | |
cnt += 1 | |
p.produce('topic', 'asd'.encode('utf8')) | |
p.flush() | |
if cnt >= 100: | |
canGo = False | |
p.flush() | |
confluent_consume.py |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# pip install kafka | |
from kafka import KafkaConsumer, TopicPartition | |
import signal | |
# значения доп-опций - KafkaConsumer.DEFAULT_CONFIG | |
# прочитайте их что бы знать как можно делать | |
consumer = KafkaConsumer('topic', | |
group_id='topic-readers', | |
bootstrap_servers=['localhost:9092'], | |
enable_auto_commit=False, # Хотим ручной коммит | |
) | |
# корректное завршение по сигналам | |
canGo = True | |
def doStop(*args, **kwargs): | |
global canGo | |
canGo = False | |
signal.signal(signal.SIGINT, doStop) | |
signal.signal(signal.SIGTERM, doStop) | |
# если подключатся к не актвному топику будет небольшая задержка | |
for msg in consumer: | |
# прихождит сообщение: | |
# msg = ConsumerRecord(topic='topic', partition=0, offset=106, timestamp=1485198079690, | |
# timestamp_type=0, key=None, value=b'\xd0...', | |
# checksum=-1296717591, serialized_key_size=-1, serialized_value_size=32) | |
print(msg) | |
# синхронно подтверждаем чтение, можно и асинхронно | |
consumer.commit() | |
if not canGo: | |
break |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# pip install kafka | |
from kafka import KafkaProducer | |
import signal | |
# значения доп-опций - KafkaProducer.DEFAULT_CONFIG | |
# прочитайте их что бы знать как можно делать | |
producer = KafkaProducer(bootstrap_servers='localhost:9092') | |
cnt = 0 | |
canGo = True | |
# корректное завршение по сигналам | |
def doStop(*args, **kwargs): | |
global canGo | |
canGo = False | |
signal.signal(signal.SIGINT, doStop) | |
signal.signal(signal.SIGTERM, doStop) | |
while canGo: | |
cnt += 1 | |
# отправляем асинхронно | |
producer.send('topic', 'Байты-байты-байты'.encode('utf8')) | |
# точка синхронизации, если упадет до неё, то сообщение может быть не доставлено | |
# если не вызывать, то проихсодит асинхронно автоматически | |
producer.flush() | |
if cnt >= 100: | |
canGo = False | |
producer.close() # убеждаемся в успешной отправке и закрываемся |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment