Last active
February 13, 2025 06:27
-
-
Save gigenthomas/9a9ccbcc483700be9956931941fb2422 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
) | |
// KafkaConsumer wraps the Confluent Kafka Consumer | |
type KafkaConsumer struct { | |
consumer *kafka.Consumer | |
topics []string | |
} | |
// NewKafkaConsumer initializes a new KafkaConsumer | |
func NewKafkaConsumer(brokers, groupID string, topics []string) (*KafkaConsumer, error) { | |
config := &kafka.ConfigMap{ | |
"bootstrap.servers": brokers, | |
"group.id": groupID, | |
"auto.offset.reset": "latest", // Start from latest offset | |
"enable.auto.commit": false, // Manually commit offsets | |
"session.timeout.ms": 6000, | |
"go.events.channel.size": 100, | |
"go.application.rebalance.enable": true, | |
} | |
consumer, err := kafka.NewConsumer(config) | |
if err != nil { | |
return nil, err | |
} | |
return &KafkaConsumer{consumer: consumer, topics: topics}, nil | |
} | |
// Start begins consuming messages using goroutines | |
func (kc *KafkaConsumer) Start(ctx context.Context, workers int) { | |
var wg sync.WaitGroup | |
err := kc.consumer.SubscribeTopics(kc.topics, nil) | |
if err != nil { | |
log.Fatalf("Failed to subscribe to topics: %v", err) | |
} | |
// Worker goroutines | |
for i := 0; i < workers; i++ { | |
wg.Add(1) | |
go func(workerID int) { | |
defer wg.Done() | |
fmt.Printf("Worker %d started...\n", workerID) | |
for { | |
select { | |
case <-ctx.Done(): | |
fmt.Printf("Worker %d shutting down...\n", workerID) | |
return | |
default: | |
msg, err := kc.consumer.ReadMessage(-1) // Blocking call | |
if err == nil { | |
fmt.Printf("[Worker %d] Received message: Topic=%s Partition=%d Offset=%d Key=%s Value=%s\n", | |
workerID, *msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset, string(msg.Key), string(msg.Value)) | |
// Manually commit the offset | |
_, err = kc.consumer.Commit() | |
if err != nil { | |
log.Printf("Commit error: %v\n", err) | |
} | |
} else { | |
log.Printf("Consumer error: %v (%v)\n", err, msg) | |
} | |
} | |
} | |
}(i) | |
} | |
// Wait for workers to finish | |
wg.Wait() | |
} | |
// Close shuts down the consumer | |
func (kc *KafkaConsumer) Close() { | |
kc.consumer.Close() | |
} | |
func main() { | |
brokers := "localhost:9092" | |
groupID := "example-group" | |
topics := []string{"topic1", "topic2"} | |
workerCount := 3 | |
consumer, err := NewKafkaConsumer(brokers, groupID, topics) | |
if err != nil { | |
log.Fatalf("Failed to create Kafka consumer: %v", err) | |
} | |
defer consumer.Close() | |
ctx, cancel := context.WithCancel(context.Background()) | |
// Handle graceful shutdown | |
go func() { | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM) | |
<-sigchan | |
fmt.Println("Received shutdown signal...") | |
cancel() | |
}() | |
consumer.Start(ctx, workerCount) | |
fmt.Println("Kafka consumer stopped.") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment