Skip to content

Instantly share code, notes, and snippets.

@wingedpig
Created October 5, 2017 16:47
Show Gist options
  • Save wingedpig/6d4999591fc305b683b9fd7cbeaed240 to your computer and use it in GitHub Desktop.
Save wingedpig/6d4999591fc305b683b9fd7cbeaed240 to your computer and use it in GitHub Desktop.
Test Kafka Consumer
package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalf("consumer.Close() returned %s", err)
}
}()
partitionConsumer, err := consumer.ConsumePartition("test", 0, 2)
if err != nil {
panic(err)
}
defer func() {
log.Printf("Closing")
if err := partitionConsumer.Close(); err != nil {
log.Fatalf("partitionConsumer.Close() returned %s", err)
}
}()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d", msg.Offset)
consumed++
case kerr := <-partitionConsumer.Errors():
log.Printf("ERROR: %+v", kerr)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment