Created
January 30, 2024 20:00
-
-
Save techzilla/6d2543c38bfb66ac09b84d0cb5194cd9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"log" | |
"runtime" | |
"sync" | |
"time" | |
"github.com/segmentio/kafka-go" | |
) | |
const ( | |
kafkaBrokers = "localhost:9092" | |
kafkaTopic = "syslog_topic" | |
) | |
// SyslogMessage represents the structure of the syslog-like JSON message | |
type SyslogMessage struct { | |
Message string `json:"message"` | |
Facility string `json:"facility"` | |
Severity string `json:"severity"` | |
Timestamp string `json:"timestamp"` | |
} | |
func main() { | |
numCPU := runtime.NumCPU() | |
// Create a wait group to wait for all goroutines to finish | |
var wg sync.WaitGroup | |
wg.Add(numCPU) | |
// Launch a goroutine for each CPU core | |
for i := 0; i < numCPU; i++ { | |
go func(cpuID int) { | |
defer wg.Done() | |
runProducer(cpuID) | |
}(i) | |
} | |
// Wait for all goroutines to finish | |
wg.Wait() | |
} | |
func runProducer(cpuID int) { | |
// Initialize Kafka writer configuration | |
config := kafka.WriterConfig{ | |
Brokers: []string{kafkaBrokers}, | |
Topic: kafkaTopic, | |
Balancer: &kafka.LeastBytes{}, | |
} | |
// Create Kafka writer | |
writer := kafka.NewWriter(config) | |
// Print the CPU ID for each producer | |
fmt.Printf("Producer for CPU %d started\n", cpuID) | |
// Infinite loop sending unending stream of messages to Kafka | |
for { | |
// Construct syslog-like JSON message | |
syslogMessage := SyslogMessage{ | |
Message: fmt.Sprintf("Log message from CPU %d", cpuID), | |
Facility: "local0", | |
Severity: "info", | |
Timestamp: time.Now().Format(time.RFC3339), | |
} | |
// Convert syslogMessage to JSON string | |
syslogJSON, err := json.Marshal(syslogMessage) | |
if err != nil { | |
log.Printf("Error marshaling syslog message: %v\n", err) | |
continue | |
} | |
// Create Kafka message | |
message := kafka.Message{ | |
Key: nil, | |
Value: syslogJSON, | |
} | |
// Send the Kafka message | |
err = writer.WriteMessages(context.Background(), message) | |
if err != nil { | |
log.Printf("Failed to send message to Kafka: %v\n", err) | |
continue | |
} | |
// Print a message for each sent message (optional) | |
// fmt.Printf("Message sent from CPU %d\n", cpuID) | |
// Introduce a small delay to control the rate of messages | |
time.Sleep(100 * time.Millisecond) | |
} | |
// Note: The producer will keep running indefinitely. In a real-world scenario, you may want to implement a way | |
// to gracefully shut down the producer when needed (e.g., through signals or external triggers). | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment