Skip to content

Instantly share code, notes, and snippets.

@jonathansick
Last active August 15, 2019 19:04
Show Gist options
  • Save jonathansick/9e01932202e47df40fa5a480c9a1235c to your computer and use it in GitHub Desktop.
Save jonathansick/9e01932202e47df40fa5a480c9a1235c to your computer and use it in GitHub Desktop.
Creating Kafka topics
from confluent_kafka.admin import AdminClient, NewTopic
default_num_partitions = 1
default_replication_factor = 3
client = AdminClient({
'bootstrap.servers': 'url...'
})
# First list existing topics
metadata = client.list_topics(timeout=10)
existing_topic_names = list(metadata.topics.keys())
new_topics = []
if topic_name in existing_topic_names:
topic = metadata.topics[topic_name]
partitions = listiter(topic.partitions.values())
logger.info(
'Topic exists',
topic=topic_name,
partitions=len(topic.partitions),
replication_factor=len(partitions[0].replicas))
else:
new_topics.append(NewTopic(
topic_name,
num_partitions=default_num_partitions,
replication_factor=default_replication_factor))
if len(new_topics) > 0:
fs = client.create_topics(new_topics)
for topic_name, f in fs.items():
try:
f.result() # The result itself is None
logger.info(
'Created topic',
topic=topic_name,
partitions=default_num_partitions)
except Exception as e:
logger.error(
'Failed to create topic',
topic=topic_name, error=str(e))
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment