Created
August 14, 2025 01:21
-
-
Save khun84/01383091065d8fc1fd3a5b9b4c8ccf7b to your computer and use it in GitHub Desktop.
kafka-ops
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
import os | |
import time | |
from confluent_kafka.admin import AdminClient, NewTopic, KafkaException | |
from confluent_kafka import Producer, Consumer, TopicPartition, OFFSET_BEGINNING, OFFSET_END, \ | |
ConsumerGroupTopicPartitions | |
# --- Configuration --- | |
KAFKA_BROKERS = "comma separated list of brokers" # IMPORTANT: Use your SSL port (e.g., 9093) | |
CONSUMER_GROUP_ID = "my-test-group" | |
TEST_TOPIC = "test-topic" | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
# Path to your certs directory (relative to this script) | |
CERTS_DIR = "/etc/ssl" | |
CA_CERT_PATH = os.path.join(CERTS_DIR, "cert.pem") | |
# --- Security Warning --- | |
# THIS CONFIGURATION IS VULNERABLE TO MAN-IN-THE-MIDDLE ATTACKS. | |
# USE ONLY FOR TESTING WHERE SECURITY IS NOT A CONCERN. | |
# 'ssl.endpoint.identification.algorithm=none' disables hostname verification. | |
# Kafka Client SSL/TLS Configuration (Encryption ONLY) | |
base_ssl_conf = { | |
'security.protocol': 'SSL', | |
'ssl.ca.location': CA_CERT_PATH, | |
# THIS IS THE KEY FOR "NO IDENTITY CARE" (WARNING: INSECURE) | |
'ssl.endpoint.identification.algorithm': 'none', | |
} | |
# Admin Client Configuration | |
admin_conf = { | |
'bootstrap.servers': KAFKA_BROKERS, | |
**base_ssl_conf # Merge SSL settings | |
} | |
# Producer Configuration | |
producer_conf = { | |
'bootstrap.servers': KAFKA_BROKERS, | |
**base_ssl_conf # Merge SSL settings | |
} | |
# Consumer Configuration (for testing consumption) | |
consumer_conf = { | |
'bootstrap.servers': KAFKA_BROKERS, | |
'group.id': CONSUMER_GROUP_ID, | |
# When offsets are explicitly set by AdminClient, auto.offset.reset is ignored. | |
# We set it to 'earliest' here as a safe fallback in case alter_consumer_group_offsets fails. | |
'auto.offset.reset': 'earliest', | |
'enable.auto.commit': True, | |
'auto.commit.interval.ms': 1000, | |
**base_ssl_conf # Merge SSL settings | |
} | |
# --- Helper Functions --- | |
def create_admin_client(): | |
try: | |
return AdminClient(admin_conf) | |
except KafkaException as e: | |
logger.info(f"Failed to create AdminClient: {e}") | |
sys.exit(1) | |
def create_topic(admin_client, topic_name, num_partitions=3, replication_factor=1): | |
logger.info(f"Creating topic '{topic_name}'...") | |
new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) | |
futures = admin_client.create_topics([new_topic]) | |
for topic, future in futures.items(): | |
try: | |
future.result() # The result itself is None | |
logger.info(f"Topic '{topic}' created successfully.") | |
except KafkaException as e: | |
if "TopicAlreadyExistsError" in str(e): | |
logger.info(f"Topic '{topic}' already exists, continuing.") | |
else: | |
logger.info(f"Failed to create topic '{topic}': {e}") | |
sys.exit(1) | |
time.sleep(1) # Give Kafka time to settle | |
def delete_topic(admin_client, topic_name): | |
logger.info(f"Deleting topic '{topic_name}'...") | |
futures = admin_client.delete_topics([topic_name]) | |
for topic, future in futures.items(): | |
try: | |
future.result() | |
logger.info(f"Topic '{topic}' deleted successfully.") | |
except KafkaException as e: | |
if "UnknownTopicOrPartitionError" in str(e): | |
logger.info(f"Topic '{topic}' does not exist, no need to delete.") | |
else: | |
logger.info(f"Failed to delete topic '{topic}': {e}") | |
def delete_consumer_group(admin_client, group_id): | |
logger.info(f"Deleting consumer group '{group_id}'...") | |
futures = admin_client.delete_consumer_groups([group_id]) | |
for group, future in futures.items(): | |
try: | |
future.result() | |
logger.info(f"Consumer group '{group}' deleted successfully.") | |
except KafkaException as e: | |
logger.info(f"Failed to delete consumer group '{group}': {e}") | |
time.sleep(2) # Give Kafka time to process deletion | |
def produce_messages(producer, topic_name, num_messages): | |
logger.info(f"Producing {num_messages} messages to '{topic_name}'...") | |
for i in range(num_messages): | |
msg_value = f"message-{int(time.time() * 1000)}-{i}" | |
producer.produce(topic_name, value=msg_value.encode('utf-8')) | |
producer.flush(timeout=10) # Ensure messages are sent | |
logger.info(f"{num_messages} messages produced.") | |
time.sleep(2) # Give messages time to be written | |
def describe_consumer_group(admin_client, group_id): | |
logger.info(f"Describing consumer group '{group_id}':") | |
futures = admin_client.describe_consumer_groups([group_id]) | |
for group, future in futures.items(): | |
try: | |
result = future.result() | |
logger.info(f" Group: {result.group_id}, State: {result.state}") | |
if result.members: | |
logger.info(" Members:") | |
for member in result.members: | |
logger.info(f" - Member ID: {member.member_id}, Client ID: {member.client_id}, Host: {member.client_host}") | |
# if result.offsets: | |
# logger.info(" Committed Offsets:") | |
# for tp, offset in result.offsets.items(): | |
# logger.info( | |
# f" - {tp.topic} [{tp.partition}]: {offset.offset} (metadata: {offset.metadata if offset.metadata else 'None'})") | |
else: | |
logger.info(" No committed offsets found.") | |
except KafkaException as e: | |
logger.info(f"Failed to describe consumer group '{group}': {e}") | |
def setup_consumer_group_offsets(admin_client: AdminClient, topic_name: str, group_id: str, offset_position=OFFSET_END): | |
"""Set consumer group offsets to a specific position (OFFSET_END for latest, OFFSET_BEGINNING for earliest)""" | |
logger.info(f"Setting consumer group '{group_id}' offsets for '{topic_name}' to {'LATEST' if offset_position == OFFSET_END else 'EARLIEST'}.") | |
logger.info(" This will control where the consumer starts reading messages.") | |
# Get topic metadata to list all partitions | |
md = admin_client.list_topics(topic=topic_name, timeout=10) | |
if topic_name not in md.topics: | |
logger.info(f"Error: Topic {topic_name} not found during metadata fetch.") | |
return False | |
partitions = md.topics[topic_name].partitions.keys() | |
consumer = Consumer(consumer_conf) | |
tps_to_reset = [] | |
for p in partitions: | |
logger.info(f" Partition {p}") | |
watermark_offsets = consumer.get_watermark_offsets(TopicPartition(topic_name, p)) | |
low, high = watermark_offsets | |
logger.info(f" Offset: {low}-{high}") | |
tps_to_reset.append(TopicPartition(topic_name, p, high)) | |
# Create TopicPartition objects set to the specified offset position | |
# tps_to_reset = [TopicPartition(topic_name, p, offset_position) for p in partitions] | |
req = ConsumerGroupTopicPartitions(group_id, tps_to_reset) | |
# Alter the consumer group offsets | |
futures = admin_client.alter_consumer_group_offsets([req]) | |
success = True | |
for tp, future in futures.items(): | |
try: | |
future.result() # Wait for the operation to complete | |
logger.info(f" Successfully set offset for {tp} to {'latest' if offset_position == OFFSET_END else 'earliest'}.") | |
except KafkaException as e: | |
logger.info(f" Failed to set offset for {tp}: {e}") | |
success = False | |
time.sleep(1) # Give Kafka a moment to process the offset commit | |
return success | |
def test_consumer(topic_name, group_id, expected_messages=5, max_poll_time=15): | |
"""Test consuming messages from the topic""" | |
logger.info(f"Starting consumer test for group '{group_id}' on topic '{topic_name}'.") | |
logger.info(f" Expected messages: {expected_messages}, Max poll time: {max_poll_time}s") | |
consumer_test = Consumer(consumer_conf) | |
consumer_test.subscribe([topic_name]) | |
msg_count = 0 | |
start_time = time.time() | |
logger.info(" Polling for messages...") | |
try: | |
while True: | |
if time.time() - start_time > max_poll_time: | |
logger.info(f" Timed out after {max_poll_time} seconds. Consumed {msg_count} messages.") | |
break | |
msg = consumer_test.poll(timeout=1.0) # Poll for 1 second | |
if msg is None: | |
continue | |
if msg.error(): | |
if msg.error().code() == KafkaException._PARTITION_EOF: | |
# End of partition event - not an error | |
logger.info(f"%% {msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}") | |
else: | |
logger.info(f"Consumer error: {msg.error()}") | |
break | |
logger.info( | |
f" -> Consumed: Topic={msg.topic()}, Partition={msg.partition()}, Offset={msg.offset()}") | |
msg_count += 1 | |
if msg_count >= expected_messages: | |
logger.info(f" Successfully consumed {msg_count} messages.") | |
break | |
finally: | |
consumer_test.close() | |
logger.info(" Consumer test finished.") | |
return msg_count | |
def cleanup_resources(admin_client, topic_name, group_id): | |
"""Clean up topic and consumer group""" | |
logger.info(f"Cleaning up resources...") | |
delete_consumer_group(admin_client, group_id) | |
logger.info("Cleanup completed.") | |
def run_full_test(): | |
"""Run the complete test workflow""" | |
logger.info("--- Running Full Test Workflow ---") | |
admin_client = create_admin_client() | |
producer = Producer(producer_conf) | |
try: | |
# Step 0: Clean up previous state | |
logger.info("=== Step 0: Initial Cleanup ===") | |
delete_consumer_group(admin_client, CONSUMER_GROUP_ID) | |
# Step 1: Set consumer group offsets to latest | |
logger.info("=== Step 1: Setup Consumer Group Offsets ===") | |
setup_consumer_group_offsets(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_END) | |
# Step 2: Produce new messages | |
# logger.info("=== Step 2: Produce Messages ===") | |
# produce_messages(producer, TEST_TOPIC, 5) | |
# Step 3: Test consumer | |
logger.info("=== Step 3: Test Consumer ===") | |
test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages=5) | |
# Step 4: Describe consumer group | |
logger.info("=== Step 4: Describe Consumer Group ===") | |
describe_consumer_group(admin_client, CONSUMER_GROUP_ID) | |
except KeyboardInterrupt: | |
logger.info("Aborted by user.") | |
except Exception as e: | |
logger.info(f"An unhandled error occurred: {e}") | |
finally: | |
# Step 5: Final cleanup | |
logger.info("=== Step 5: Final Cleanup ===") | |
cleanup_resources(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID) | |
logger.info("--- Full Test Completed ---") | |
def main(): | |
"""Main function with flexible step execution""" | |
if not os.path.exists(CA_CERT_PATH): | |
logger.info(f"ERROR: CA certificate not found at '{CA_CERT_PATH}'. Please ensure it exists.") | |
sys.exit(1) | |
logger.info("--- Python (confluent-kafka-python) Consumer Group Setup Tool ---") | |
logger.info(f"Kafka Brokers: {KAFKA_BROKERS}") | |
logger.info(f"Test Topic: {TEST_TOPIC}") | |
logger.info(f"Consumer Group ID: {CONSUMER_GROUP_ID}") | |
logger.info("WARNING: This setup uses 'ssl.endpoint.identification.algorithm=none' for simplicity.") | |
logger.info(" This disables server identity verification and is INSECURE for production.") | |
custom_workflow() | |
def create_topic_only(): | |
"""Create topic only""" | |
admin_client = create_admin_client() | |
create_topic(admin_client, TEST_TOPIC) | |
def setup_offsets_only(offset_position): | |
"""Setup consumer group offsets only""" | |
admin_client = create_admin_client() | |
setup_consumer_group_offsets(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID, offset_position) | |
def produce_messages_only(): | |
"""Produce messages only""" | |
producer = Producer(producer_conf) | |
num_messages = int(input("Number of messages to produce (default 5): ") or "5") | |
produce_messages(producer, TEST_TOPIC, num_messages) | |
def test_consumer_only(): | |
"""Test consumer only""" | |
expected_messages = int(input("Expected number of messages (default 5): ") or "5") | |
max_poll_time = int(input("Max poll time in seconds (default 15): ") or "15") | |
test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages, max_poll_time) | |
def describe_group_only(): | |
"""Describe consumer group only""" | |
admin_client = create_admin_client() | |
describe_consumer_group(admin_client, CONSUMER_GROUP_ID) | |
def cleanup_only(): | |
"""Cleanup resources only""" | |
admin_client = create_admin_client() | |
cleanup_resources(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID) | |
def custom_workflow(): | |
"""Run a custom workflow by selecting individual steps""" | |
logger.info("--- Custom Workflow Builder ---") | |
logger.info("Select the steps you want to run (in order):") | |
steps = { | |
'1': ('Initial Cleanup', lambda ac, p: (delete_consumer_group(ac, CONSUMER_GROUP_ID))), | |
# '2': ('Create Topic', lambda ac, p: create_topic(ac, TEST_TOPIC)), | |
'3': ('Setup Offsets (Latest)', lambda ac, p: setup_consumer_group_offsets(ac, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_END)), | |
'4': ('Setup Offsets (Earliest)', lambda ac, p: setup_consumer_group_offsets(ac, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_BEGINNING)), | |
# '5': ('Produce Messages', lambda ac, p: produce_messages(p, TEST_TOPIC, 5)), | |
'6': ('Test Consumer', lambda ac, p: test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages=100)), | |
'7': ('Describe Consumer Group', lambda ac, p: describe_consumer_group(ac, CONSUMER_GROUP_ID)), | |
# '8': ('Final Cleanup', lambda ac, p: cleanup_resources(ac, TEST_TOPIC, CONSUMER_GROUP_ID)) | |
} | |
for key, (description, _) in steps.items(): | |
logger.info(f" {key}. {description}") | |
# selected_steps = input("Enter step numbers separated by commas (e.g., 1,2,3,5,6): ").strip() | |
selected_steps = "6" | |
if not selected_steps: | |
logger.info("No steps selected.") | |
return | |
step_numbers = [s.strip() for s in selected_steps.split(',')] | |
admin_client = create_admin_client() | |
producer = Producer(producer_conf) | |
for step_num in step_numbers: | |
if step_num in steps: | |
description, func = steps[step_num] | |
logger.info(f"=== Executing: {description} ===") | |
try: | |
func(admin_client, producer) | |
except Exception as e: | |
logger.error(f"Error in step '{description}': {e}") | |
break | |
else: | |
logger.info(f"Invalid step number: {step_num}") | |
# --- Main Script Logic --- | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment