Created
September 11, 2025 06:07
-
-
Save Manikkumar1988/a243353cc395c5494c236d7dcacabe88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "fmt" | |
| "os" | |
| "os/signal" | |
| "runtime" | |
| "syscall" | |
| "time" | |
| "github.com/Shopify/sarama" | |
| ) | |
| type BadHandler struct{} | |
| func (h *BadHandler) Setup(s sarama.ConsumerGroupSession) error { | |
| fmt.Println("[BadHandler] Setup") | |
| logMem("setup-bad") | |
| return nil | |
| } | |
| func (h *BadHandler) Cleanup(s sarama.ConsumerGroupSession) error { | |
| fmt.Println("[BadHandler] Cleanup") | |
| logMem("cleanup-bad") | |
| return nil | |
| } | |
| func (h *BadHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
| // ❌ ignores session.Context().Done() | |
| for msg := range claim.Messages() { | |
| _ = msg | |
| session.MarkMessage(msg, "") | |
| } | |
| return nil | |
| } | |
| type GoodHandler struct{} | |
| func (h *GoodHandler) Setup(s sarama.ConsumerGroupSession) error { | |
| fmt.Println("[GoodHandler] Setup") | |
| logMem("setup-good") | |
| return nil | |
| } | |
| func (h *GoodHandler) Cleanup(s sarama.ConsumerGroupSession) error { | |
| fmt.Println("[GoodHandler] Cleanup") | |
| logMem("cleanup-good") | |
| return nil | |
| } | |
| func (h *GoodHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
| // ✅ exits immediately on rebalance | |
| for { | |
| select { | |
| case msg, ok := <-claim.Messages(): | |
| if !ok { | |
| return nil | |
| } | |
| _ = msg | |
| session.MarkMessage(msg, "") | |
| case <-session.Context().Done(): | |
| return nil | |
| } | |
| } | |
| } | |
| func logMem(stage string) { | |
| var m runtime.MemStats | |
| runtime.ReadMemStats(&m) | |
| fmt.Printf("[%s] Goroutines: %d | Alloc: %d MB | HeapInuse: %d MB\n", | |
| stage, | |
| runtime.NumGoroutine(), | |
| m.Alloc/1024/1024, | |
| m.HeapInuse/1024/1024, | |
| ) | |
| } | |
| func main() { | |
| brokers := []string{"localhost:9092"} // ⚠️ point to your Kafka | |
| group := "test-group" | |
| topic := "test-topic" | |
| // Sarama config | |
| config := sarama.NewConfig() | |
| config.Version = sarama.V2_5_0_0 | |
| config.Consumer.Return.Errors = true | |
| config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange | |
| config.Consumer.Offsets.Initial = sarama.OffsetNewest | |
| // Choose which handler to test: | |
| var handler sarama.ConsumerGroupHandler | |
| if len(os.Args) > 1 && os.Args[1] == "bad" { | |
| handler = &BadHandler{} | |
| fmt.Println(">>> Running with BAD handler (ignores session.Context)") | |
| } else { | |
| handler = &GoodHandler{} | |
| fmt.Println(">>> Running with GOOD handler (respects session.Context)") | |
| } | |
| groupClient, err := sarama.NewConsumerGroup(brokers, group, config) | |
| if err != nil { | |
| panic(err) | |
| } | |
| defer groupClient.Close() | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| // Trap signals | |
| sigchan := make(chan os.Signal, 1) | |
| signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
| // Consume loop | |
| go func() { | |
| for { | |
| if err := groupClient.Consume(ctx, []string{topic}, handler); err != nil { | |
| fmt.Printf("Error from consumer: %v\n", err) | |
| time.Sleep(time.Second) | |
| } | |
| } | |
| }() | |
| // Keep alive until Ctrl+C | |
| <-sigchan | |
| fmt.Println("Shutting down...") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment