Skip to content

Instantly share code, notes, and snippets.

@noonat
Created July 21, 2016 16:41
Show Gist options
  • Save noonat/346f53200be2523e884aba42c7c9734b to your computer and use it in GitHub Desktop.
Save noonat/346f53200be2523e884aba42c7c9734b to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"log"
"os"
"os/signal"
"strings"
"github.com/Shopify/sarama"
"github.com/davecheney/errors"
)
var errCommand = errors.New(`command must be "describe" or "consume"`)
type Closer interface {
Close() error
}
func close(closer Closer) {
if closer == nil {
return
}
if err := closer.Close(); err != nil {
log.Printf("error closing: %s\n", err.Error())
}
}
func consume(addrs []string, group, topic string, partition int32, stopChan chan bool) error {
client, err := sarama.NewClient(addrs, nil)
if err != nil {
return errors.Trace(err)
}
defer close(client)
mgr, err := sarama.NewOffsetManagerFromClient(group, client)
if err != nil {
return errors.Trace(err)
}
defer close(mgr)
partMgr, err := mgr.ManagePartition(topic, partition)
if err != nil {
return errors.Trace(err)
}
defer close(partMgr)
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return errors.Trace(err)
}
defer close(consumer)
// Finally we get to the actual object we are going to use: a partition
// consumer. We fetch the next offset from where we left off from the
// partition offset manager, and start consuming the same partition within
// the topic, starting from that offset.
offset, _ := partMgr.NextOffset()
partConsumer, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return errors.Trace(err)
}
defer close(partConsumer)
log.Println("Consuming messages from offset", offset, "within", topic, "partition", partition)
for {
select {
// Read messages from the consumer channel, and mark each offset after
// processing it. This persists that value up to Kafka for your consumer
// group, so if the worker is restarted, it can resume where it left off.
//
// The second argument to the MarkOffset function is `metadata`, which is
// an arbitrary (but relatively short) string that your consumer is
// supposed to be able to use to reconstruct where it left off. Maybe it
// points to a file on disk or something with some persisted state in it.
case msg := <-partConsumer.Messages():
log.Printf("Consumed message: %+v", msg)
partMgr.MarkOffset(msg.Offset, "")
case <-stopChan:
log.Println("Consumer stopping")
return nil
}
}
return nil
}
func main() {
var brokerAddrsArg string
var consumerGroup string
var topic string
var partition int
flag.StringVar(&brokerAddrsArg, "brokers", "127.0.0.1:9092", "Comma separated list of Kafka broker addresses to connect to (e.g. 127.0.0.1:9092,127.0.0.1:9093).")
flag.StringVar(&consumerGroup, "consumer-group", "test", "Name for the consumer group.")
flag.IntVar(&partition, "partition", 0, "Partition within the topic to consume.")
flag.StringVar(&topic, "topic", "test", "Kafka topic to consume.")
flag.Parse()
brokerAddrs := strings.Split(brokerAddrsArg, ",")
stopChan := make(chan bool, 1)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
stopChan <- true
}()
if err := consume(brokerAddrs, consumerGroup, topic, int32(partition), stopChan); err != nil {
log.Fatalf("error: %+v\n", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment