Last active
February 2, 2022 14:21
-
-
Save MasterAler/a3a51d83dc3303ccbac06b718dd22c19 to your computer and use it in GitHub Desktop.
Simple aiokafka listener with consumer & producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import ecs_logging | |
import sys | |
import json | |
from dataclasses import dataclass | |
from typing import Callable | |
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer | |
from aiokafka.helpers import create_ssl_context | |
from .config import BOOTSTRAP_SERVERS # <--- that is supposed to be some valid connection string | |
@dataclass | |
class Payload: | |
processor: Callable | |
consumer_group_id: str | |
label: str | |
output_topic_name: str | |
input_topic_name: str | |
custom_init: Callable = None | |
class Listener: | |
""" Here's a boilerplate class for integration with Kafka """ | |
def __init__(self, payload: Payload): | |
self.__payload = payload | |
self.__logger = logging.getLogger() | |
self.__logger.setLevel(logging.INFO) | |
self.__handler = logging.StreamHandler(sys.stdout) | |
self.__handler.setLevel(logging.INFO) | |
self.__handler.setFormatter(ecs_logging.StdlibFormatter()) | |
self.__logger.addHandler(self.__handler) | |
if payload.custom_init is not None: | |
payload.custom_init() | |
def listen_blocking(self): | |
asyncio.run(self.listen()) | |
async def listen(self): | |
consumer = AIOKafkaConsumer( | |
bootstrap_servers=BOOTSTRAP_SERVERS, | |
group_id=self.__payload.consumer_group_id, | |
metadata_max_age_ms=1000, # This controls the polling interval | |
security_protocol="SSL", | |
ssl_context=create_ssl_context() | |
) | |
producer = AIOKafkaProducer(client_id="Listener", | |
bootstrap_servers=BOOTSTRAP_SERVERS, | |
security_protocol="SSL", | |
ssl_context=create_ssl_context(), | |
compression_type="gzip") | |
await consumer.start() | |
await producer.start() | |
consumer.subscribe(pattern=INPUT_TOPIC_NAME) | |
try: | |
async for msg in consumer: # Will detect metadata changes | |
self.__logger.info("Consumed msg by[{}] -- {}:{}".format(self.__payload.consumer_group_id, msg.topic, msg.partition)) | |
try: | |
msg_data = json.loads(msg.value) | |
data_id = msg_data["data_id"] | |
self.__logger.info("Group %s processing data %s", self.__payload.consumer_group_id, data_id) | |
try: | |
results = None | |
try: | |
results = self.__payload.processor(msg_data["SOME_VALID_DATA_KEY"]) | |
except Exception as err: | |
self.__logger.error(f"ALGO {self.__payload.label} FAILED ON {data_id}, FIX IT: {err}") | |
continue | |
for result in results: | |
result["data_id"] = data_id | |
await producer.send(self.__payload.output_topic_name, | |
json.dumps(result).encode("ascii"), | |
key=data_id.encode("ascii")) | |
self.__logger.info("Sent {} for {}".format(self.__payload.label, data_id)) | |
except (RuntimeError, ValueError) as err: | |
self.__logger.error("Result send failed: {}".format(err)) | |
except ValueError as err: | |
self.__logger.error("Error with input: {}\n{}".format(msg.value, err)) | |
except (RuntimeError, ValueError) as err: | |
self.__logger.error("Consumer failed: {}".format(err)) | |
finally: | |
await consumer.stop() | |
await producer.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment