Skip to content

Instantly share code, notes, and snippets.

@luiscoms
Created August 22, 2025 12:54
Show Gist options
  • Save luiscoms/d03bee25cd3391d92dfd496939c5b22e to your computer and use it in GitHub Desktop.
Save luiscoms/d03bee25cd3391d92dfd496939c5b22e to your computer and use it in GitHub Desktop.
This script is used to send the contents of a specified file to a Kafka topic.
#!/usr/bin/env python3
"""
This script is used to send the contents of a specified file to a Kafka topic.
Pre-requisites:
- Python 3.x
- Kafka broker running and accessible
- Required Python packages installed (e.g., kafka-python, json, logging)
The file is expected to contain JSON data, which will be read, processed, and sent as a message to the Kafka topic.
The script supports both single file sending and bulk sending from a list of files.
It also includes logging with unique identifiers for batch, trace, and message IDs to facilitate tracking.
Usage:
-- install dependencies
pip install kafka-python
-- send a single file to Kafka topic
python kafka_send_file.py -f <filename> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>]
-- send multiple files listed in a file to Kafka topic
python kafka_send_file.py --bulk-send <file-with-list-of-files> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>]
"""
import argparse
import json
import logging
import logging.config
from os.path import isfile
from uuid import uuid4
from kafka import KafkaProducer
BATCH_ID = str(uuid4())
class SystemLogFilter(logging.Filter):
def filter(self, record):
if not hasattr(record, 'batchId'):
record.batchId = BATCH_ID
if not hasattr(record, 'traceId'):
record.traceId = '--'
if not hasattr(record, 'messageId'):
record.messageId = '--'
return super().filter(record)
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)s BATCH_ID="%(batchId)s" TRACE_ID="%(traceId)s" MESSAGE_ID="%(messageId)s" %(message)s',
},
},
'filters': {
'message_params': {
'()': 'send_file.SystemLogFilter',
}
},
'handlers': {
'default': {
'level': 'DEBUG',
'formatter': 'standard',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout', # Default is stderr
'filters': ['message_params'],
},
},
'loggers': {
'': { # root logger
'handlers': ['default'],
'level': 'DEBUG',
'propagate': False
},
'kafka': { # root logger
'handlers': ['default'],
'level': 'WARNING',
'propagate': False
},
'__main__': { # if __name__ == '__main__'
'handlers': ['default'],
'level': 'DEBUG',
'propagate': False
},
}
}
logging.config.dictConfig(LOGGING_CONFIG)
def read_file_contents(filename):
try:
with open(filename, 'r') as file:
return file.read()
except FileNotFoundError:
logging.error(f"File '{filename}' not found")
raise
def file_to_message(filename):
content = read_file_contents(filename)
logging.debug('read file %s', filename)
data = json.loads(content)
# append message and trace ids in message payload
data["messageId"] = str(uuid4())
data["traceId"] = str(uuid4())
data["batchId"] = BATCH_ID
return data
def send_message_to_kafka(message, kafka_broker, topic_name, username=None, password=None):
producer = KafkaProducer(
bootstrap_servers=[kafka_broker],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
security_protocol="SASL_SSL" if username and password else 'PLAINTEXT',
sasl_mechanism="SCRAM-SHA-512" if username and password else None,
sasl_plain_username=username,
sasl_plain_password=password
)
logging.debug(
"KafkaProducer initialized with broker: %s, topic: %s, username: %s",
kafka_broker, topic_name, username
)
topic_key = f"script-send-{message['traceId']}"
producer.send(topic_name, message, topic_key.encode('utf-8'))
producer.flush()
logging.debug(
'Message %s sent to %s Kafka topic %s successfully',
topic_key, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId'])
)
def list_files(filename):
with open(filename, 'r') as r:
return [line for line in r.read().splitlines() if line and isfile(line)]
def send_file_to_kafka(filename, kafka_broker, topic_name, username=None, password=None):
message = file_to_message(filename)
send_message_to_kafka(message, kafka_broker, topic_name, username, password)
logging.info(
'Message of file %s sent to %s Kafka topic %s successfully',
filename, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId'])
)
def main():
logging.info("Starting the script")
parser = argparse.ArgumentParser(description='Send file contents to Kafka topic', prefix_chars='-')
parser.add_argument('-f', '--filename', metavar='filename', type=str, default=None, help='Name of file to read')
parser.add_argument('-b', '--kafka-broker',
metavar='kafka-broker', type=str, default='127.0.0.1:9092', help='Kafka broker address')
parser.add_argument('--bulk-send',
metavar='bulk-send', type=str)
parser.add_argument('-t', '--topic-name',
metavar='topic-name', type=str, required=True, help='Kafka topic name')
parser.add_argument("--username", type=str, help="Kafka username", required=False)
parser.add_argument("--password", type=str, help="Kafka password", required=False)
args = parser.parse_args()
if args.filename:
send_file_to_kafka(args.filename, args.kafka_broker, args.topic_name, args.username, args.password)
elif args.bulk_send:
files = list_files(args.bulk_send)
for file in files:
send_file_to_kafka(file, args.kafka_broker, args.topic_name, args.topic_name, args.username, args.password)
logging.info("Script finished")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment