Created
June 24, 2023 15:24
-
-
Save marcieltorres/50c43d57ea30ac0ac7994beb72b94d50 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
from confluent_kafka import Consumer, Message | |
from app.common.converters import try_convert_bytes_to_a_string | |
class KafkaConsumerClient: | |
__slots__ = ['servers', | |
'client_id', | |
'group_id', | |
'topic_offset', | |
'topics', | |
'session_timeout', | |
'timeout', | |
'number_of_messages', | |
'consumer'] | |
servers: str | |
client_id: str | |
group_id: str | |
topic_offset: str | |
topics: List[str] | |
session_timeout: int | |
timeout: int | |
number_of_messages: int | |
def __init__(self, servers, client_id, group_id, topics, topic_offset="latest", session_timeout=30000, timeout=5, number_of_messages=200): | |
self.servers = servers | |
self.client_id = client_id | |
self.group_id = group_id | |
self.topics = topics | |
self.topic_offset = topic_offset | |
self.session_timeout = session_timeout | |
self.timeout = timeout | |
self.number_of_messages = number_of_messages | |
self.consumer = self._create_consumer() | |
def _create_consumer(self): | |
kafa_consumer_conifg = { | |
"bootstrap.servers": self.servers, | |
"client.id": self.client_id, | |
"group.id": self.group_id, | |
"enable.auto.commit": False, | |
"default.topic.config": {"auto.offset.reset": self.topic_offset}, | |
"session.timeout.ms": self.session_timeout, | |
} | |
return Consumer(kafa_consumer_conifg) | |
def is_empty_queue(self, message: Message) -> bool: | |
return message is None or message.key() is None | |
def is_empty_message(self, message: Message) -> bool: | |
return message.value() is None | |
def is_message_error(self, message: Message) -> bool: | |
return message.error() | |
def get_header_value(self, headers, key): | |
msg_headers = [header[1] if len(header) > 1 else None for header in headers if header[0].lower() == key.lower()] if headers and key else None | |
return try_convert_bytes_to_a_string(msg_headers[0]) if msg_headers else None | |
def start_to_consume(self): | |
self.consumer.subscribe(self.topics) | |
def consume(self) -> List[str]: | |
messages = self.consumer.consume( | |
num_messages=self.number_of_messages, timeout=self.timeout | |
) | |
if not messages: | |
return [] | |
sanitized_messages = [] | |
for message in messages: | |
sanitized_message = self._sanitize_message(message) | |
if not sanitized_message: | |
continue | |
sanitized_messages.append(sanitized_message) | |
return sanitized_messages | |
def _sanitize_message(self, message: Message) -> str: | |
if self.is_empty_queue(message): | |
return None | |
if self.is_message_error(message): | |
return None | |
if self.is_empty_message(message): | |
return None | |
return message | |
def commit(self): | |
self.consumer.commit(asynchronous=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from unittest import TestCase | |
from unittest.mock import MagicMock | |
from app.clients.kafka import KafkaConsumerClient | |
class KafkaClientTest(TestCase): | |
def setUp(self): | |
self.kafka_consumer_client = KafkaConsumerClient( | |
servers="kafka.servers.endpoint", | |
client_id="client.id", | |
group_id="group.id", | |
topic_offset="latest", | |
topics=["topic1-to-consume"] | |
) | |
self.kafka_consumer_client.consumer = MagicMock() | |
def test_when_I_check_empty_queue_with_None_should_be_success(self): | |
empty_queue = self.kafka_consumer_client.is_empty_queue(None) | |
self.assertTrue(empty_queue) | |
def test_when_I_check_empty_queue_with_message_without_key_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.key.return_value = None | |
empty_queue = self.kafka_consumer_client.is_empty_queue(message_to_test) | |
self.assertTrue(empty_queue) | |
def test_when_I_check_empty_message_with_message_without_key_should_be_success( | |
self | |
): | |
message_to_test = MagicMock() | |
message_to_test.value.return_value = None | |
empty_message = self.kafka_consumer_client.is_empty_message(message_to_test) | |
self.assertTrue(empty_message) | |
def test_when_I_check_if_is_error_message_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.error.return_value = "test" | |
message_error = self.kafka_consumer_client.is_message_error(message_to_test) | |
self.assertIsNotNone(message_error) | |
def test_when_I_call_start_should_be_success(self): | |
self.kafka_consumer_client.start_to_consume() | |
self.assertTrue(self.kafka_consumer_client.consumer.subscribe.called) | |
def test_when_I_call_consume_and_doesnt_have_messages_should_be_success(self): | |
self.kafka_consumer_client.consumer.consume.return_value = [] | |
messages = self.kafka_consumer_client.consume() | |
self.assertListEqual(messages, []) | |
def test_when_I_call_consume_and_have_messages_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.key.return_value = "ITSNOTAKEY" | |
message_to_test.value.return_value = "ITSNOTAVALUE" | |
message_to_test.error.return_value = None | |
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test] | |
messages = self.kafka_consumer_client.consume() | |
self.assertEqual(len(messages), 1) | |
def test_when_I_call_consume_and_queue_is_empty_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.key.return_value = None | |
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test] | |
messages = self.kafka_consumer_client.consume() | |
self.assertListEqual(messages, []) | |
def test_when_I_call_consume_and_message_have_error_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.key.return_value = "ITSNOTAKEY" | |
message_to_test.value.return_value = "ITSNOTAVALUE" | |
message_to_test.error.return_value = MagicMock() | |
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test] | |
messages = self.kafka_consumer_client.consume() | |
self.assertListEqual(messages, []) | |
def test_when_I_call_consume_and_message_is_empty_should_be_success(self): | |
message_to_test = MagicMock() | |
message_to_test.key.return_value = "ITSNOTAKEY" | |
message_to_test.value.return_value = None | |
message_to_test.error.return_value = None | |
self.kafka_consumer_client.consumer.consume.return_value = [message_to_test] | |
messages = self.kafka_consumer_client.consume() | |
self.assertListEqual(messages, []) | |
def test_when_I_call_commit_should_be_success(self): | |
self.kafka_consumer_client.commit() | |
self.assertTrue(self.kafka_consumer_client.consumer.commit.called) | |
def test_get_header_when_headers_is_none_should_be_none(self): | |
header_value = self.kafka_consumer_client.get_header_value(None, 'header-name') | |
self.assertIsNone(header_value) | |
def test_get_header_when_headers_is_an_empty_list_should_be_none(self): | |
header_value = self.kafka_consumer_client.get_header_value([], 'header-name') | |
self.assertIsNone(header_value) | |
def test_get_header_when_headers_has_value_should_be_success(self): | |
headers = [] | |
headers.append(['header-name', 'header-value']) | |
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name') | |
self.assertEqual(header_value, 'header-value') | |
def test_get_header_when_headers_has_byte_value_should_be_success(self): | |
headers = [] | |
headers.append(['header-name', str.encode('header-value', 'utf-8')]) | |
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name') | |
self.assertEqual(header_value, 'header-value') | |
def test_get_header_when_headers_has_none_value_should_be_none(self): | |
headers = [] | |
headers.append(['header-name']) | |
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name') | |
self.assertIsNone(header_value) | |
def test_get_header_when_headers_has_upper_and_lower_value_should_be_success(self): | |
headers = [] | |
headers.append(['hEaDeR-NamE', 'header-value']) | |
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name') | |
self.assertEqual(header_value, 'header-value') | |
def test_get_header_when_headers_has_value_and_header_doesnt_exists_should_be_none(self): | |
headers = [] | |
headers.append(['header-name', 'header-value']) | |
header_value = self.kafka_consumer_client.get_header_value(headers, 'header-name-that-doesnt-exists') | |
self.assertIsNone(header_value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment