Last active
August 15, 2019 19:04
-
-
Save jonathansick/9e01932202e47df40fa5a480c9a1235c to your computer and use it in GitHub Desktop.
Creating Kafka topics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka.admin import AdminClient, NewTopic | |
default_num_partitions = 1 | |
default_replication_factor = 3 | |
client = AdminClient({ | |
'bootstrap.servers': 'url...' | |
}) | |
# First list existing topics | |
metadata = client.list_topics(timeout=10) | |
existing_topic_names = list(metadata.topics.keys()) | |
new_topics = [] | |
if topic_name in existing_topic_names: | |
topic = metadata.topics[topic_name] | |
partitions = listiter(topic.partitions.values()) | |
logger.info( | |
'Topic exists', | |
topic=topic_name, | |
partitions=len(topic.partitions), | |
replication_factor=len(partitions[0].replicas)) | |
else: | |
new_topics.append(NewTopic( | |
topic_name, | |
num_partitions=default_num_partitions, | |
replication_factor=default_replication_factor)) | |
if len(new_topics) > 0: | |
fs = client.create_topics(new_topics) | |
for topic_name, f in fs.items(): | |
try: | |
f.result() # The result itself is None | |
logger.info( | |
'Created topic', | |
topic=topic_name, | |
partitions=default_num_partitions) | |
except Exception as e: | |
logger.error( | |
'Failed to create topic', | |
topic=topic_name, error=str(e)) | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment