session.timeout.ms
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
https://docs.confluent.io/current/clients/python.html#python-client
#!/usr/bin/env python
import logging
import os
import threading
import time
from multiprocessing import Process
from queue import Queue
import csv
import json
from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import requests
import config
import utility
app_api = config.BatchJob.app_api
headers = {'Content-Type': 'application/json'}
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
logging.error('Message delivery failed: {}'.format(err))
else:
logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def _process_msg(q, c, p):
try:
p.poll(0)
msg = q.get(timeout=60) # Set timeout to care for POSIX<3.0 and Windows.
msg_v = msg.value()
json_msg = json.loads(msg_v)
json_request = json_msg['request']
prd_no = json_msg['prdNo']
file_path = json_request['file_path']
if not file_path:
raise ValueError
except ValueError:
logging.error("The image file not exists! prd_no : {}, file_path : {}".format(prd_no, file_path))
c.commit(msg)
q.task_done()
return
except Exception as err:
logging.error('Consume error : {}, prd_no : {}, message : {}'.format(err, prd_no, json_msg))
c.commit(msg)
q.task_done()
return
try:
r = requests.post('http://' + app_api + '/v2/getScore/filePath', headers=headers,
data=json.dumps(json_request))
r_json = r.json()
json_response = json_msg.copy()
del json_response['request']
json_response['response'] = r_json
data = json.dumps(json_response)
except Exception as err:
logging.error('Scoring error : {}, prd_no : {}, json_request : {}'.format(err, prd_no, json_request))
# Scoring error! Don't commit.
q.task_done()
return
try:
p.produce('image.cleanscore.response', data, callback=delivery_report)
except KafkaException as err:
logging.error('Produce error : {}, prd_no : {}, json_request : {}'.format(err, prd_no, json_request))
# Produce error! Don't commit
q.task_done()
slack_title = 'Local queue full!!!!'
slack_message = '<@UFN7T1K6D>? Check the batch process!!!'
utility.slack(slack_title, slack_message)
p.poll(5)
return
try:
if r_json['status'] != 'success':
logging.error("Scoring failed! prd_no : {}, file_path : {}".format(prd_no, file_path))
else:
logging.info(
'#%sT%s - Scored: %s',
os.getpid(), threading.get_ident(), json_response
)
except Exception as err:
logging.error('Json error : {}, prd_no : {}, json_response : {}'.format(err, prd_no, json_response))
finally:
q.task_done()
c.commit(msg)
def _consume(kafka_config):
'''
There is no connection checking! So check if it works after start!!!
'''
logging.info(
'#%s - Starting consumer group=%s, consume_topic=%s',
os.getpid(), kafka_config['kafka_kwargs']['group.id'], kafka_config['consume_topic'],
)
c = Consumer(**kafka_config['kafka_kwargs'])
c.subscribe([kafka_config['consume_topic']])
q = Queue(maxsize=kafka_config['num_threads'])
p = Producer({'bootstrap.servers': config.Kafka.produce_bootstrap_servers,
'linger.ms': 5,
})
with ThreadPoolExecutor(max_workers=config.BatchJob.max_threads_per_process) as executor:
while True:
logging.info('#%s - Waiting for message...', os.getpid())
try:
msg = c.poll(config.Kafka.poll_time)
if msg is None:
continue
if msg.error():
logging.error(
'#%s - Consumer error: %s', os.getpid(), msg.error()
)
continue
q.put(msg)
# Use default daemon=False to stop threads gracefully in order to
# release resources properly.
# t = threading.Thread(target=_process_msg, args=(q, c, p))
# t.start()
# _process_msg(q, c, p)
executor.submit(_process_msg, q, c, p)
except Exception:
logging.exception('#%s - Worker terminated.', os.getpid())
finally:
c.close()
p.flush()
def main(kafka_config):
"""
Program that consumes request messages from Kafka request topic and score to response topic
"""
workers = []
while True:
num_alive = len([w for w in workers if w.is_alive()])
if kafka_config['num_workers'] == num_alive:
time.sleep(1)
continue
for _ in range(kafka_config['num_workers'] - num_alive):
p = Process(target=_consume, daemon=True, args=(kafka_config,))
p.start()
workers.append(p)
logging.info('Starting worker #%s', p.pid)
if __name__ == '__main__':
if config.BatchJob.log_to_file:
logging.basicConfig(
level=config.BatchJob.log_level,
filename=config.BatchJob.log_file, filemode='a',
format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
)
else:
logging.basicConfig(
level=config.BatchJob.log_level,
format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
)
main(kafka_config={
# At most, this should be the total number of Kafka partitions on
# the topic.
'num_workers': config.Kafka.num_workers,
'num_threads': config.Kafka.num_threads,
'consume_topic': config.Kafka.consume_topic,
'produce_topic': config.Kafka.produce_topic,
'kafka_kwargs': {
'bootstrap.servers': config.Kafka.consume_bootstrap_servers,
'group.id': config.Kafka.group_id,
'auto.offset.reset': 'earliest',
# Commit manually to care for abrupt shutdown.
'enable.auto.commit': False,
},
})