Skip to content

Instantly share code, notes, and snippets.

@techzilla
Created January 30, 2024 20:00
Show Gist options
  • Save techzilla/6d2543c38bfb66ac09b84d0cb5194cd9 to your computer and use it in GitHub Desktop.
Save techzilla/6d2543c38bfb66ac09b84d0cb5194cd9 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"runtime"
"sync"
"time"
"github.com/segmentio/kafka-go"
)
const (
kafkaBrokers = "localhost:9092"
kafkaTopic = "syslog_topic"
)
// SyslogMessage represents the structure of the syslog-like JSON message
type SyslogMessage struct {
Message string `json:"message"`
Facility string `json:"facility"`
Severity string `json:"severity"`
Timestamp string `json:"timestamp"`
}
func main() {
numCPU := runtime.NumCPU()
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
wg.Add(numCPU)
// Launch a goroutine for each CPU core
for i := 0; i < numCPU; i++ {
go func(cpuID int) {
defer wg.Done()
runProducer(cpuID)
}(i)
}
// Wait for all goroutines to finish
wg.Wait()
}
func runProducer(cpuID int) {
// Initialize Kafka writer configuration
config := kafka.WriterConfig{
Brokers: []string{kafkaBrokers},
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{},
}
// Create Kafka writer
writer := kafka.NewWriter(config)
// Print the CPU ID for each producer
fmt.Printf("Producer for CPU %d started\n", cpuID)
// Infinite loop sending unending stream of messages to Kafka
for {
// Construct syslog-like JSON message
syslogMessage := SyslogMessage{
Message: fmt.Sprintf("Log message from CPU %d", cpuID),
Facility: "local0",
Severity: "info",
Timestamp: time.Now().Format(time.RFC3339),
}
// Convert syslogMessage to JSON string
syslogJSON, err := json.Marshal(syslogMessage)
if err != nil {
log.Printf("Error marshaling syslog message: %v\n", err)
continue
}
// Create Kafka message
message := kafka.Message{
Key: nil,
Value: syslogJSON,
}
// Send the Kafka message
err = writer.WriteMessages(context.Background(), message)
if err != nil {
log.Printf("Failed to send message to Kafka: %v\n", err)
continue
}
// Print a message for each sent message (optional)
// fmt.Printf("Message sent from CPU %d\n", cpuID)
// Introduce a small delay to control the rate of messages
time.Sleep(100 * time.Millisecond)
}
// Note: The producer will keep running indefinitely. In a real-world scenario, you may want to implement a way
// to gracefully shut down the producer when needed (e.g., through signals or external triggers).
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment