Skip to content

Instantly share code, notes, and snippets.

Created May 9, 2024 20:44
Creating a number of topics on a given Kafka Cluster using a ThreadPoolExecutor
import subprocess
from threading import current_thread
from threading import get_ident
from threading import get_native_id
from concurrent.futures import ThreadPoolExecutor
def process_topic(filepath):
thread = current_thread()
#print(f'Worker thread: name={}, ident={get_ident()}, id={get_native_id()}')
result =["docker", "exec", "-t", "broker1", "/bin/bash", "-c", f'kafka-topics --bootstrap-server broker1:9092 --topic topic-{filepath} --replication-factor 3 --partitions 3 --create --config min.insync.replicas=2'], text=True)
#print("Have {} bytes in stdout: {}".format(len(result.stdout), result.stdout.strip(' \t\n\r')))
# initialise a Thread Pool (16 worker threads) for concurrent operations
with ThreadPoolExecutor(16) as executor:
# submit some tasks
_ =, range(32))
Copy link

To test, use this cluster:

To run: python3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment