Skip to content

Instantly share code, notes, and snippets.

@Manikkumar1988
Created September 11, 2025 06:07
Show Gist options
  • Select an option

  • Save Manikkumar1988/a243353cc395c5494c236d7dcacabe88 to your computer and use it in GitHub Desktop.

Select an option

Save Manikkumar1988/a243353cc395c5494c236d7dcacabe88 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/Shopify/sarama"
)
type BadHandler struct{}
func (h *BadHandler) Setup(s sarama.ConsumerGroupSession) error {
fmt.Println("[BadHandler] Setup")
logMem("setup-bad")
return nil
}
func (h *BadHandler) Cleanup(s sarama.ConsumerGroupSession) error {
fmt.Println("[BadHandler] Cleanup")
logMem("cleanup-bad")
return nil
}
func (h *BadHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// ❌ ignores session.Context().Done()
for msg := range claim.Messages() {
_ = msg
session.MarkMessage(msg, "")
}
return nil
}
type GoodHandler struct{}
func (h *GoodHandler) Setup(s sarama.ConsumerGroupSession) error {
fmt.Println("[GoodHandler] Setup")
logMem("setup-good")
return nil
}
func (h *GoodHandler) Cleanup(s sarama.ConsumerGroupSession) error {
fmt.Println("[GoodHandler] Cleanup")
logMem("cleanup-good")
return nil
}
func (h *GoodHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// ✅ exits immediately on rebalance
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
_ = msg
session.MarkMessage(msg, "")
case <-session.Context().Done():
return nil
}
}
}
func logMem(stage string) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("[%s] Goroutines: %d | Alloc: %d MB | HeapInuse: %d MB\n",
stage,
runtime.NumGoroutine(),
m.Alloc/1024/1024,
m.HeapInuse/1024/1024,
)
}
func main() {
brokers := []string{"localhost:9092"} // ⚠️ point to your Kafka
group := "test-group"
topic := "test-topic"
// Sarama config
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// Choose which handler to test:
var handler sarama.ConsumerGroupHandler
if len(os.Args) > 1 && os.Args[1] == "bad" {
handler = &BadHandler{}
fmt.Println(">>> Running with BAD handler (ignores session.Context)")
} else {
handler = &GoodHandler{}
fmt.Println(">>> Running with GOOD handler (respects session.Context)")
}
groupClient, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
panic(err)
}
defer groupClient.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Trap signals
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Consume loop
go func() {
for {
if err := groupClient.Consume(ctx, []string{topic}, handler); err != nil {
fmt.Printf("Error from consumer: %v\n", err)
time.Sleep(time.Second)
}
}
}()
// Keep alive until Ctrl+C
<-sigchan
fmt.Println("Shutting down...")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment