Skip to content

Instantly share code, notes, and snippets.

@gigenthomas
Last active February 13, 2025 06:27
Show Gist options
  • Save gigenthomas/9a9ccbcc483700be9956931941fb2422 to your computer and use it in GitHub Desktop.
Save gigenthomas/9a9ccbcc483700be9956931941fb2422 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// KafkaConsumer wraps the Confluent Kafka Consumer
type KafkaConsumer struct {
consumer *kafka.Consumer
topics []string
}
// NewKafkaConsumer initializes a new KafkaConsumer
func NewKafkaConsumer(brokers, groupID string, topics []string) (*KafkaConsumer, error) {
config := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": groupID,
"auto.offset.reset": "latest", // Start from latest offset
"enable.auto.commit": false, // Manually commit offsets
"session.timeout.ms": 6000,
"go.events.channel.size": 100,
"go.application.rebalance.enable": true,
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
return nil, err
}
return &KafkaConsumer{consumer: consumer, topics: topics}, nil
}
// Start begins consuming messages using goroutines
func (kc *KafkaConsumer) Start(ctx context.Context, workers int) {
var wg sync.WaitGroup
err := kc.consumer.SubscribeTopics(kc.topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe to topics: %v", err)
}
// Worker goroutines
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
fmt.Printf("Worker %d started...\n", workerID)
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d shutting down...\n", workerID)
return
default:
msg, err := kc.consumer.ReadMessage(-1) // Blocking call
if err == nil {
fmt.Printf("[Worker %d] Received message: Topic=%s Partition=%d Offset=%d Key=%s Value=%s\n",
workerID, *msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset, string(msg.Key), string(msg.Value))
// Manually commit the offset
_, err = kc.consumer.Commit()
if err != nil {
log.Printf("Commit error: %v\n", err)
}
} else {
log.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
}(i)
}
// Wait for workers to finish
wg.Wait()
}
// Close shuts down the consumer
func (kc *KafkaConsumer) Close() {
kc.consumer.Close()
}
func main() {
brokers := "localhost:9092"
groupID := "example-group"
topics := []string{"topic1", "topic2"}
workerCount := 3
consumer, err := NewKafkaConsumer(brokers, groupID, topics)
if err != nil {
log.Fatalf("Failed to create Kafka consumer: %v", err)
}
defer consumer.Close()
ctx, cancel := context.WithCancel(context.Background())
// Handle graceful shutdown
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM)
<-sigchan
fmt.Println("Received shutdown signal...")
cancel()
}()
consumer.Start(ctx, workerCount)
fmt.Println("Kafka consumer stopped.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment