Skip to content

Instantly share code, notes, and snippets.

@khun84
Created August 14, 2025 01:21
Show Gist options
  • Save khun84/01383091065d8fc1fd3a5b9b4c8ccf7b to your computer and use it in GitHub Desktop.
Save khun84/01383091065d8fc1fd3a5b9b4c8ccf7b to your computer and use it in GitHub Desktop.
kafka-ops
import logging
import sys
import os
import time
from confluent_kafka.admin import AdminClient, NewTopic, KafkaException
from confluent_kafka import Producer, Consumer, TopicPartition, OFFSET_BEGINNING, OFFSET_END, \
ConsumerGroupTopicPartitions
# --- Configuration ---
KAFKA_BROKERS = "comma separated list of brokers" # IMPORTANT: Use your SSL port (e.g., 9093)
CONSUMER_GROUP_ID = "my-test-group"
TEST_TOPIC = "test-topic"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Path to your certs directory (relative to this script)
CERTS_DIR = "/etc/ssl"
CA_CERT_PATH = os.path.join(CERTS_DIR, "cert.pem")
# --- Security Warning ---
# THIS CONFIGURATION IS VULNERABLE TO MAN-IN-THE-MIDDLE ATTACKS.
# USE ONLY FOR TESTING WHERE SECURITY IS NOT A CONCERN.
# 'ssl.endpoint.identification.algorithm=none' disables hostname verification.
# Kafka Client SSL/TLS Configuration (Encryption ONLY)
base_ssl_conf = {
'security.protocol': 'SSL',
'ssl.ca.location': CA_CERT_PATH,
# THIS IS THE KEY FOR "NO IDENTITY CARE" (WARNING: INSECURE)
'ssl.endpoint.identification.algorithm': 'none',
}
# Admin Client Configuration
admin_conf = {
'bootstrap.servers': KAFKA_BROKERS,
**base_ssl_conf # Merge SSL settings
}
# Producer Configuration
producer_conf = {
'bootstrap.servers': KAFKA_BROKERS,
**base_ssl_conf # Merge SSL settings
}
# Consumer Configuration (for testing consumption)
consumer_conf = {
'bootstrap.servers': KAFKA_BROKERS,
'group.id': CONSUMER_GROUP_ID,
# When offsets are explicitly set by AdminClient, auto.offset.reset is ignored.
# We set it to 'earliest' here as a safe fallback in case alter_consumer_group_offsets fails.
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000,
**base_ssl_conf # Merge SSL settings
}
# --- Helper Functions ---
def create_admin_client():
try:
return AdminClient(admin_conf)
except KafkaException as e:
logger.info(f"Failed to create AdminClient: {e}")
sys.exit(1)
def create_topic(admin_client, topic_name, num_partitions=3, replication_factor=1):
logger.info(f"Creating topic '{topic_name}'...")
new_topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
futures = admin_client.create_topics([new_topic])
for topic, future in futures.items():
try:
future.result() # The result itself is None
logger.info(f"Topic '{topic}' created successfully.")
except KafkaException as e:
if "TopicAlreadyExistsError" in str(e):
logger.info(f"Topic '{topic}' already exists, continuing.")
else:
logger.info(f"Failed to create topic '{topic}': {e}")
sys.exit(1)
time.sleep(1) # Give Kafka time to settle
def delete_topic(admin_client, topic_name):
logger.info(f"Deleting topic '{topic_name}'...")
futures = admin_client.delete_topics([topic_name])
for topic, future in futures.items():
try:
future.result()
logger.info(f"Topic '{topic}' deleted successfully.")
except KafkaException as e:
if "UnknownTopicOrPartitionError" in str(e):
logger.info(f"Topic '{topic}' does not exist, no need to delete.")
else:
logger.info(f"Failed to delete topic '{topic}': {e}")
def delete_consumer_group(admin_client, group_id):
logger.info(f"Deleting consumer group '{group_id}'...")
futures = admin_client.delete_consumer_groups([group_id])
for group, future in futures.items():
try:
future.result()
logger.info(f"Consumer group '{group}' deleted successfully.")
except KafkaException as e:
logger.info(f"Failed to delete consumer group '{group}': {e}")
time.sleep(2) # Give Kafka time to process deletion
def produce_messages(producer, topic_name, num_messages):
logger.info(f"Producing {num_messages} messages to '{topic_name}'...")
for i in range(num_messages):
msg_value = f"message-{int(time.time() * 1000)}-{i}"
producer.produce(topic_name, value=msg_value.encode('utf-8'))
producer.flush(timeout=10) # Ensure messages are sent
logger.info(f"{num_messages} messages produced.")
time.sleep(2) # Give messages time to be written
def describe_consumer_group(admin_client, group_id):
logger.info(f"Describing consumer group '{group_id}':")
futures = admin_client.describe_consumer_groups([group_id])
for group, future in futures.items():
try:
result = future.result()
logger.info(f" Group: {result.group_id}, State: {result.state}")
if result.members:
logger.info(" Members:")
for member in result.members:
logger.info(f" - Member ID: {member.member_id}, Client ID: {member.client_id}, Host: {member.client_host}")
# if result.offsets:
# logger.info(" Committed Offsets:")
# for tp, offset in result.offsets.items():
# logger.info(
# f" - {tp.topic} [{tp.partition}]: {offset.offset} (metadata: {offset.metadata if offset.metadata else 'None'})")
else:
logger.info(" No committed offsets found.")
except KafkaException as e:
logger.info(f"Failed to describe consumer group '{group}': {e}")
def setup_consumer_group_offsets(admin_client: AdminClient, topic_name: str, group_id: str, offset_position=OFFSET_END):
"""Set consumer group offsets to a specific position (OFFSET_END for latest, OFFSET_BEGINNING for earliest)"""
logger.info(f"Setting consumer group '{group_id}' offsets for '{topic_name}' to {'LATEST' if offset_position == OFFSET_END else 'EARLIEST'}.")
logger.info(" This will control where the consumer starts reading messages.")
# Get topic metadata to list all partitions
md = admin_client.list_topics(topic=topic_name, timeout=10)
if topic_name not in md.topics:
logger.info(f"Error: Topic {topic_name} not found during metadata fetch.")
return False
partitions = md.topics[topic_name].partitions.keys()
consumer = Consumer(consumer_conf)
tps_to_reset = []
for p in partitions:
logger.info(f" Partition {p}")
watermark_offsets = consumer.get_watermark_offsets(TopicPartition(topic_name, p))
low, high = watermark_offsets
logger.info(f" Offset: {low}-{high}")
tps_to_reset.append(TopicPartition(topic_name, p, high))
# Create TopicPartition objects set to the specified offset position
# tps_to_reset = [TopicPartition(topic_name, p, offset_position) for p in partitions]
req = ConsumerGroupTopicPartitions(group_id, tps_to_reset)
# Alter the consumer group offsets
futures = admin_client.alter_consumer_group_offsets([req])
success = True
for tp, future in futures.items():
try:
future.result() # Wait for the operation to complete
logger.info(f" Successfully set offset for {tp} to {'latest' if offset_position == OFFSET_END else 'earliest'}.")
except KafkaException as e:
logger.info(f" Failed to set offset for {tp}: {e}")
success = False
time.sleep(1) # Give Kafka a moment to process the offset commit
return success
def test_consumer(topic_name, group_id, expected_messages=5, max_poll_time=15):
"""Test consuming messages from the topic"""
logger.info(f"Starting consumer test for group '{group_id}' on topic '{topic_name}'.")
logger.info(f" Expected messages: {expected_messages}, Max poll time: {max_poll_time}s")
consumer_test = Consumer(consumer_conf)
consumer_test.subscribe([topic_name])
msg_count = 0
start_time = time.time()
logger.info(" Polling for messages...")
try:
while True:
if time.time() - start_time > max_poll_time:
logger.info(f" Timed out after {max_poll_time} seconds. Consumed {msg_count} messages.")
break
msg = consumer_test.poll(timeout=1.0) # Poll for 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# End of partition event - not an error
logger.info(f"%% {msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}")
else:
logger.info(f"Consumer error: {msg.error()}")
break
logger.info(
f" -> Consumed: Topic={msg.topic()}, Partition={msg.partition()}, Offset={msg.offset()}")
msg_count += 1
if msg_count >= expected_messages:
logger.info(f" Successfully consumed {msg_count} messages.")
break
finally:
consumer_test.close()
logger.info(" Consumer test finished.")
return msg_count
def cleanup_resources(admin_client, topic_name, group_id):
"""Clean up topic and consumer group"""
logger.info(f"Cleaning up resources...")
delete_consumer_group(admin_client, group_id)
logger.info("Cleanup completed.")
def run_full_test():
"""Run the complete test workflow"""
logger.info("--- Running Full Test Workflow ---")
admin_client = create_admin_client()
producer = Producer(producer_conf)
try:
# Step 0: Clean up previous state
logger.info("=== Step 0: Initial Cleanup ===")
delete_consumer_group(admin_client, CONSUMER_GROUP_ID)
# Step 1: Set consumer group offsets to latest
logger.info("=== Step 1: Setup Consumer Group Offsets ===")
setup_consumer_group_offsets(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_END)
# Step 2: Produce new messages
# logger.info("=== Step 2: Produce Messages ===")
# produce_messages(producer, TEST_TOPIC, 5)
# Step 3: Test consumer
logger.info("=== Step 3: Test Consumer ===")
test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages=5)
# Step 4: Describe consumer group
logger.info("=== Step 4: Describe Consumer Group ===")
describe_consumer_group(admin_client, CONSUMER_GROUP_ID)
except KeyboardInterrupt:
logger.info("Aborted by user.")
except Exception as e:
logger.info(f"An unhandled error occurred: {e}")
finally:
# Step 5: Final cleanup
logger.info("=== Step 5: Final Cleanup ===")
cleanup_resources(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID)
logger.info("--- Full Test Completed ---")
def main():
"""Main function with flexible step execution"""
if not os.path.exists(CA_CERT_PATH):
logger.info(f"ERROR: CA certificate not found at '{CA_CERT_PATH}'. Please ensure it exists.")
sys.exit(1)
logger.info("--- Python (confluent-kafka-python) Consumer Group Setup Tool ---")
logger.info(f"Kafka Brokers: {KAFKA_BROKERS}")
logger.info(f"Test Topic: {TEST_TOPIC}")
logger.info(f"Consumer Group ID: {CONSUMER_GROUP_ID}")
logger.info("WARNING: This setup uses 'ssl.endpoint.identification.algorithm=none' for simplicity.")
logger.info(" This disables server identity verification and is INSECURE for production.")
custom_workflow()
def create_topic_only():
"""Create topic only"""
admin_client = create_admin_client()
create_topic(admin_client, TEST_TOPIC)
def setup_offsets_only(offset_position):
"""Setup consumer group offsets only"""
admin_client = create_admin_client()
setup_consumer_group_offsets(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID, offset_position)
def produce_messages_only():
"""Produce messages only"""
producer = Producer(producer_conf)
num_messages = int(input("Number of messages to produce (default 5): ") or "5")
produce_messages(producer, TEST_TOPIC, num_messages)
def test_consumer_only():
"""Test consumer only"""
expected_messages = int(input("Expected number of messages (default 5): ") or "5")
max_poll_time = int(input("Max poll time in seconds (default 15): ") or "15")
test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages, max_poll_time)
def describe_group_only():
"""Describe consumer group only"""
admin_client = create_admin_client()
describe_consumer_group(admin_client, CONSUMER_GROUP_ID)
def cleanup_only():
"""Cleanup resources only"""
admin_client = create_admin_client()
cleanup_resources(admin_client, TEST_TOPIC, CONSUMER_GROUP_ID)
def custom_workflow():
"""Run a custom workflow by selecting individual steps"""
logger.info("--- Custom Workflow Builder ---")
logger.info("Select the steps you want to run (in order):")
steps = {
'1': ('Initial Cleanup', lambda ac, p: (delete_consumer_group(ac, CONSUMER_GROUP_ID))),
# '2': ('Create Topic', lambda ac, p: create_topic(ac, TEST_TOPIC)),
'3': ('Setup Offsets (Latest)', lambda ac, p: setup_consumer_group_offsets(ac, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_END)),
'4': ('Setup Offsets (Earliest)', lambda ac, p: setup_consumer_group_offsets(ac, TEST_TOPIC, CONSUMER_GROUP_ID, OFFSET_BEGINNING)),
# '5': ('Produce Messages', lambda ac, p: produce_messages(p, TEST_TOPIC, 5)),
'6': ('Test Consumer', lambda ac, p: test_consumer(TEST_TOPIC, CONSUMER_GROUP_ID, expected_messages=100)),
'7': ('Describe Consumer Group', lambda ac, p: describe_consumer_group(ac, CONSUMER_GROUP_ID)),
# '8': ('Final Cleanup', lambda ac, p: cleanup_resources(ac, TEST_TOPIC, CONSUMER_GROUP_ID))
}
for key, (description, _) in steps.items():
logger.info(f" {key}. {description}")
# selected_steps = input("Enter step numbers separated by commas (e.g., 1,2,3,5,6): ").strip()
selected_steps = "6"
if not selected_steps:
logger.info("No steps selected.")
return
step_numbers = [s.strip() for s in selected_steps.split(',')]
admin_client = create_admin_client()
producer = Producer(producer_conf)
for step_num in step_numbers:
if step_num in steps:
description, func = steps[step_num]
logger.info(f"=== Executing: {description} ===")
try:
func(admin_client, producer)
except Exception as e:
logger.error(f"Error in step '{description}': {e}")
break
else:
logger.info(f"Invalid step number: {step_num}")
# --- Main Script Logic ---
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment