Skip to content

Instantly share code, notes, and snippets.

@techzilla
Created January 25, 2024 21:33
Show Gist options
  • Save techzilla/3c6cbf46236ce571c7da8752fcaec914 to your computer and use it in GitHub Desktop.
Save techzilla/3c6cbf46236ce571c7da8752fcaec914 to your computer and use it in GitHub Desktop.
import threading
import time
import json
import os
import signal
from datetime import datetime, timezone
from confluent_kafka import Producer
# Kafka broker(s) configuration
bootstrap_servers = 'your_kafka_broker(s)'
# Kafka topic to produce messages to
topic = 'your_kafka_topic'
# Function to generate syslog-style JSON messages as strings
def generate_syslog_message():
timestamp = datetime.utcnow().replace(tzinfo=timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
message = '{"timestamp":"' + timestamp + '","host":"example.com","message":"Log message content","severity":"INFO","facility":"USER"}'
return message
# Function to periodically flush the producer
def flush_producer_periodically(producer, interval_sec):
while True:
time.sleep(interval_sec)
producer.flush()
# Function to run in each producer thread
def produce_messages(thread_id):
# Create Producer configuration
conf = {
'bootstrap.servers': bootstrap_servers,
'client.id': f'python-producer-{thread_id}',
'acks': 'all', # You can adjust this based on your requirements
}
# Create Kafka Producer instance
producer = Producer(conf)
# Register signal handler for SIGINT (Ctrl+C)
def sigint_handler(sig, frame):
print("Received SIGINT. Cleaning up...")
nonlocal producer
producer.flush()
producer.close()
print("Cleanup complete. Exiting.")
os._exit(0)
signal.signal(signal.SIGINT, sigint_handler)
# Start a thread for periodic flushing
flush_thread = threading.Thread(target=flush_producer_periodically, args=(producer, 5)) # Flush every 5 seconds
flush_thread.start()
try:
# Infinite loop for producing messages
while True:
message_value = generate_syslog_message()
producer.produce(topic, value=message_value)
time.sleep(1) # Adjust sleep time based on your desired message rate
except KeyboardInterrupt:
pass # Allow SIGINT to break out of the loop
finally:
# Close the producer and wait for the flush thread to finish
producer.flush()
producer.close()
flush_thread.join()
# Get the number of CPUs
num_cpus = os.cpu_count()
# Create and start producer threads
threads = []
for i in range(num_cpus):
thread = threading.Thread(target=produce_messages, args=(i,))
thread.start()
threads.append(thread)
# Wait for all threads to complete (this won't happen since they are infinite)
for thread in threads:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment