Created
August 22, 2025 12:54
-
-
Save luiscoms/d03bee25cd3391d92dfd496939c5b22e to your computer and use it in GitHub Desktop.
This script is used to send the contents of a specified file to a Kafka topic.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This script is used to send the contents of a specified file to a Kafka topic. | |
Pre-requisites: | |
- Python 3.x | |
- Kafka broker running and accessible | |
- Required Python packages installed (e.g., kafka-python, json, logging) | |
The file is expected to contain JSON data, which will be read, processed, and sent as a message to the Kafka topic. | |
The script supports both single file sending and bulk sending from a list of files. | |
It also includes logging with unique identifiers for batch, trace, and message IDs to facilitate tracking. | |
Usage: | |
-- install dependencies | |
pip install kafka-python | |
-- send a single file to Kafka topic | |
python kafka_send_file.py -f <filename> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>] | |
-- send multiple files listed in a file to Kafka topic | |
python kafka_send_file.py --bulk-send <file-with-list-of-files> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>] | |
""" | |
import argparse | |
import json | |
import logging | |
import logging.config | |
from os.path import isfile | |
from uuid import uuid4 | |
from kafka import KafkaProducer | |
BATCH_ID = str(uuid4()) | |
class SystemLogFilter(logging.Filter): | |
def filter(self, record): | |
if not hasattr(record, 'batchId'): | |
record.batchId = BATCH_ID | |
if not hasattr(record, 'traceId'): | |
record.traceId = '--' | |
if not hasattr(record, 'messageId'): | |
record.messageId = '--' | |
return super().filter(record) | |
LOGGING_CONFIG = { | |
'version': 1, | |
'disable_existing_loggers': False, | |
'formatters': { | |
'standard': { | |
'format': '%(asctime)s %(levelname)s BATCH_ID="%(batchId)s" TRACE_ID="%(traceId)s" MESSAGE_ID="%(messageId)s" %(message)s', | |
}, | |
}, | |
'filters': { | |
'message_params': { | |
'()': 'send_file.SystemLogFilter', | |
} | |
}, | |
'handlers': { | |
'default': { | |
'level': 'DEBUG', | |
'formatter': 'standard', | |
'class': 'logging.StreamHandler', | |
'stream': 'ext://sys.stdout', # Default is stderr | |
'filters': ['message_params'], | |
}, | |
}, | |
'loggers': { | |
'': { # root logger | |
'handlers': ['default'], | |
'level': 'DEBUG', | |
'propagate': False | |
}, | |
'kafka': { # root logger | |
'handlers': ['default'], | |
'level': 'WARNING', | |
'propagate': False | |
}, | |
'__main__': { # if __name__ == '__main__' | |
'handlers': ['default'], | |
'level': 'DEBUG', | |
'propagate': False | |
}, | |
} | |
} | |
logging.config.dictConfig(LOGGING_CONFIG) | |
def read_file_contents(filename): | |
try: | |
with open(filename, 'r') as file: | |
return file.read() | |
except FileNotFoundError: | |
logging.error(f"File '{filename}' not found") | |
raise | |
def file_to_message(filename): | |
content = read_file_contents(filename) | |
logging.debug('read file %s', filename) | |
data = json.loads(content) | |
# append message and trace ids in message payload | |
data["messageId"] = str(uuid4()) | |
data["traceId"] = str(uuid4()) | |
data["batchId"] = BATCH_ID | |
return data | |
def send_message_to_kafka(message, kafka_broker, topic_name, username=None, password=None): | |
producer = KafkaProducer( | |
bootstrap_servers=[kafka_broker], | |
value_serializer=lambda x: json.dumps(x).encode('utf-8'), | |
security_protocol="SASL_SSL" if username and password else 'PLAINTEXT', | |
sasl_mechanism="SCRAM-SHA-512" if username and password else None, | |
sasl_plain_username=username, | |
sasl_plain_password=password | |
) | |
logging.debug( | |
"KafkaProducer initialized with broker: %s, topic: %s, username: %s", | |
kafka_broker, topic_name, username | |
) | |
topic_key = f"script-send-{message['traceId']}" | |
producer.send(topic_name, message, topic_key.encode('utf-8')) | |
producer.flush() | |
logging.debug( | |
'Message %s sent to %s Kafka topic %s successfully', | |
topic_key, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId']) | |
) | |
def list_files(filename): | |
with open(filename, 'r') as r: | |
return [line for line in r.read().splitlines() if line and isfile(line)] | |
def send_file_to_kafka(filename, kafka_broker, topic_name, username=None, password=None): | |
message = file_to_message(filename) | |
send_message_to_kafka(message, kafka_broker, topic_name, username, password) | |
logging.info( | |
'Message of file %s sent to %s Kafka topic %s successfully', | |
filename, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId']) | |
) | |
def main(): | |
logging.info("Starting the script") | |
parser = argparse.ArgumentParser(description='Send file contents to Kafka topic', prefix_chars='-') | |
parser.add_argument('-f', '--filename', metavar='filename', type=str, default=None, help='Name of file to read') | |
parser.add_argument('-b', '--kafka-broker', | |
metavar='kafka-broker', type=str, default='127.0.0.1:9092', help='Kafka broker address') | |
parser.add_argument('--bulk-send', | |
metavar='bulk-send', type=str) | |
parser.add_argument('-t', '--topic-name', | |
metavar='topic-name', type=str, required=True, help='Kafka topic name') | |
parser.add_argument("--username", type=str, help="Kafka username", required=False) | |
parser.add_argument("--password", type=str, help="Kafka password", required=False) | |
args = parser.parse_args() | |
if args.filename: | |
send_file_to_kafka(args.filename, args.kafka_broker, args.topic_name, args.username, args.password) | |
elif args.bulk_send: | |
files = list_files(args.bulk_send) | |
for file in files: | |
send_file_to_kafka(file, args.kafka_broker, args.topic_name, args.topic_name, args.username, args.password) | |
logging.info("Script finished") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment