Skip to content

Instantly share code, notes, and snippets.

@chris001177
Created July 28, 2019 06:51
Show Gist options
  • Select an option

  • Save chris001177/32f5ee528417dd06df2c69b5efa1e17e to your computer and use it in GitHub Desktop.

Select an option

Save chris001177/32f5ee528417dd06df2c69b5efa1e17e to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// Specify brokers address. This is default one
brokers := []string{"localhost:9092"}
// Create new consumer
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
topic := "test"
// How to decide partition, is it fixed value...?
consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// Count how many message processed
msgCount := 0
// Get signnal for finish
doneCh := make(chan struct{})
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
msgCount++
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
fmt.Println("Processed", msgCount, "messages")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment