Skip to content

Instantly share code, notes, and snippets.

@marcieltorres
Created June 24, 2023 15:24
Show Gist options
  • Save marcieltorres/50c43d57ea30ac0ac7994beb72b94d50 to your computer and use it in GitHub Desktop.
Save marcieltorres/50c43d57ea30ac0ac7994beb72b94d50 to your computer and use it in GitHub Desktop.
from typing import List
from confluent_kafka import Consumer, Message
from app.common.converters import try_convert_bytes_to_a_string
class KafkaConsumerClient:
__slots__ = ['servers',
'client_id',
'group_id',
'topic_offset',
'topics',
'session_timeout',
'timeout',
'number_of_messages',
'consumer']
servers: str
client_id: str
group_id: str
topic_offset: str
topics: List[str]
session_timeout: int
timeout: int
number_of_messages: int
def __init__(self, servers, client_id, group_id, topics, topic_offset="latest", session_timeout=30000, timeout=5, number_of_messages=200):
self.servers = servers
self.client_id = client_id
self.group_id = group_id
self.topics = topics
self.topic_offset = topic_offset
self.session_timeout = session_timeout
self.timeout = timeout
self.number_of_messages = number_of_messages
self.consumer = self._create_consumer()
def _create_consumer(self):
kafa_consumer_conifg = {
"bootstrap.servers": self.servers,
"client.id": self.client_id,
"group.id": self.group_id,
"enable.auto.commit": False,
"default.topic.config": {"auto.offset.reset": self.topic_offset},
"session.timeout.ms": self.session_timeout,
}
return Consumer(kafa_consumer_conifg)
def is_empty_queue(self, message: Message) -> bool:
return message is None or message.key() is None
def is_empty_message(self, message: Message) -> bool:
return message.value() is None
def is_message_error(self, message: Message) -> bool:
return message.error()
def get_header_value(self, headers, key):
msg_headers = [header[1] if len(header) > 1 else None for header in headers if header[0].lower() == key.lower()] if headers and key else None
return try_convert_bytes_to_a_string(msg_headers[0]) if msg_headers else None
def start_to_consume(self):
self.consumer.subscribe(self.topics)
def consume(self) -> List[str]:
messages = self.consumer.consume(
num_messages=self.number_of_messages, timeout=self.timeout
)
if not messages:
return []
sanitized_messages = []
for message in messages:
sanitized_message = self._sanitize_message(message)
if not sanitized_message:
continue
sanitized_messages.append(sanitized_message)
return sanitized_messages
def _sanitize_message(self, message: Message) -> str:
if self.is_empty_queue(message):
return None
if self.is_message_error(message):
return None
if self.is_empty_message(message):
return None
return message
def commit(self):
self.consumer.commit(asynchronous=False)
from unittest import TestCase
from unittest.mock import MagicMock
from app.clients.kafka import KafkaConsumerClient
class KafkaClientTest(TestCase):
def setUp(self):
self.kafka_consumer_client = KafkaConsumerClient(
servers="kafka.servers.endpoint",
client_id="client.id",
group_id="group.id",
topic_offset="latest",
topics=["topic1-to-consume"]
)
self.kafka_consumer_client.consumer = MagicMock()
def test_when_I_check_empty_queue_with_None_should_be_success(self):
empty_queue = self.kafka_consumer_client.is_empty_queue(None)
self.assertTrue(empty_queue)
def test_when_I_check_empty_queue_with_message_without_key_should_be_success(self):
message_to_test = MagicMock()
message_to_test.key.return_value = None
empty_queue = self.kafka_consumer_client.is_empty_queue(message_to_test)
self.assertTrue(empty_queue)
def test_when_I_check_empty_message_with_message_without_key_should_be_success(
self
):
message_to_test = MagicMock()
message_to_test.value.return_value = None
empty_message = self.kafka_consumer_client.is_empty_message(message_to_test)
self.assertTrue(empty_message)
def test_when_I_check_if_is_error_message_should_be_success(self):
message_to_test = MagicMock()
message_to_test.error.return_value = "test"
message_error = self.kafka_consumer_client.is_message_error(message_to_test)
self.assertIsNotNone(message_error)
def test_when_I_call_start_should_be_success(self):
self.kafka_consumer_client.start_to_consume()
self.assertTrue(self.kafka_consumer_client.consumer.subscribe.called)
def test_when_I_call_consume_and_doesnt_have_messages_should_be_success(self):
self.kafka_consumer_client.consumer.consume.return_value = []
messages = self.kafka_consumer_client.consume()
self.assertListEqual(messages, [])
def test_when_I_call_consume_and_have_messages_should_be_success(self):
message_to_test = MagicMock()
message_to_test.key.return_value = "ITSNOTAKEY"
message_to_test.value.return_value = "ITSNOTAVALUE"
message_to_test.error.return_value = None
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test]
messages = self.kafka_consumer_client.consume()
self.assertEqual(len(messages), 1)
def test_when_I_call_consume_and_queue_is_empty_should_be_success(self):
message_to_test = MagicMock()
message_to_test.key.return_value = None
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test]
messages = self.kafka_consumer_client.consume()
self.assertListEqual(messages, [])
def test_when_I_call_consume_and_message_have_error_should_be_success(self):
message_to_test = MagicMock()
message_to_test.key.return_value = "ITSNOTAKEY"
message_to_test.value.return_value = "ITSNOTAVALUE"
message_to_test.error.return_value = MagicMock()
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test]
messages = self.kafka_consumer_client.consume()
self.assertListEqual(messages, [])
def test_when_I_call_consume_and_message_is_empty_should_be_success(self):
message_to_test = MagicMock()
message_to_test.key.return_value = "ITSNOTAKEY"
message_to_test.value.return_value = None
message_to_test.error.return_value = None
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test]
messages = self.kafka_consumer_client.consume()
self.assertListEqual(messages, [])
def test_when_I_call_commit_should_be_success(self):
self.kafka_consumer_client.commit()
self.assertTrue(self.kafka_consumer_client.consumer.commit.called)
def test_get_header_when_headers_is_none_should_be_none(self):
header_value = self.kafka_consumer_client.get_header_value(None, 'header-name')
self.assertIsNone(header_value)
def test_get_header_when_headers_is_an_empty_list_should_be_none(self):
header_value = self.kafka_consumer_client.get_header_value([], 'header-name')
self.assertIsNone(header_value)
def test_get_header_when_headers_has_value_should_be_success(self):
headers = []
headers.append(['header-name', 'header-value'])
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name')
self.assertEqual(header_value, 'header-value')
def test_get_header_when_headers_has_byte_value_should_be_success(self):
headers = []
headers.append(['header-name', str.encode('header-value', 'utf-8')])
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name')
self.assertEqual(header_value, 'header-value')
def test_get_header_when_headers_has_none_value_should_be_none(self):
headers = []
headers.append(['header-name'])
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name')
self.assertIsNone(header_value)
def test_get_header_when_headers_has_upper_and_lower_value_should_be_success(self):
headers = []
headers.append(['hEaDeR-NamE', 'header-value'])
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name')
self.assertEqual(header_value, 'header-value')
def test_get_header_when_headers_has_value_and_header_doesnt_exists_should_be_none(self):
headers = []
headers.append(['header-name', 'header-value'])
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name-that-doesnt-exists')
self.assertIsNone(header_value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment