Created
January 25, 2024 21:33
-
-
Save techzilla/3c6cbf46236ce571c7da8752fcaec914 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
import json | |
import os | |
import signal | |
from datetime import datetime, timezone | |
from confluent_kafka import Producer | |
# Kafka broker(s) configuration | |
bootstrap_servers = 'your_kafka_broker(s)' | |
# Kafka topic to produce messages to | |
topic = 'your_kafka_topic' | |
# Function to generate syslog-style JSON messages as strings | |
def generate_syslog_message(): | |
timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' | |
message = '{"timestamp":"' + timestamp + '","host":"example.com","message":"Log message content","severity":"INFO","facility":"USER"}' | |
return message | |
# Function to periodically flush the producer | |
def flush_producer_periodically(producer, interval_sec): | |
while True: | |
time.sleep(interval_sec) | |
producer.flush() | |
# Function to run in each producer thread | |
def produce_messages(thread_id): | |
# Create Producer configuration | |
conf = { | |
'bootstrap.servers': bootstrap_servers, | |
'client.id': f'python-producer-{thread_id}', | |
'acks': 'all', # You can adjust this based on your requirements | |
} | |
# Create Kafka Producer instance | |
producer = Producer(conf) | |
# Register signal handler for SIGINT (Ctrl+C) | |
def sigint_handler(sig, frame): | |
print("Received SIGINT. Cleaning up...") | |
nonlocal producer | |
producer.flush() | |
producer.close() | |
print("Cleanup complete. Exiting.") | |
os._exit(0) | |
signal.signal(signal.SIGINT, sigint_handler) | |
# Start a thread for periodic flushing | |
flush_thread = threading.Thread(target=flush_producer_periodically, args=(producer, 5)) # Flush every 5 seconds | |
flush_thread.start() | |
try: | |
# Infinite loop for producing messages | |
while True: | |
message_value = generate_syslog_message() | |
producer.produce(topic, value=message_value) | |
time.sleep(1) # Adjust sleep time based on your desired message rate | |
except KeyboardInterrupt: | |
pass # Allow SIGINT to break out of the loop | |
finally: | |
# Close the producer and wait for the flush thread to finish | |
producer.flush() | |
producer.close() | |
flush_thread.join() | |
# Get the number of CPUs | |
num_cpus = os.cpu_count() | |
# Create and start producer threads | |
threads = [] | |
for i in range(num_cpus): | |
thread = threading.Thread(target=produce_messages, args=(i,)) | |
thread.start() | |
threads.append(thread) | |
# Wait for all threads to complete (this won't happen since they are infinite) | |
for thread in threads: | |
thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment