Created
July 21, 2016 16:41
-
-
Save noonat/346f53200be2523e884aba42c7c9734b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"strings" | |
"github.com/Shopify/sarama" | |
"github.com/davecheney/errors" | |
) | |
var errCommand = errors.New(`command must be "describe" or "consume"`) | |
type Closer interface { | |
Close() error | |
} | |
func close(closer Closer) { | |
if closer == nil { | |
return | |
} | |
if err := closer.Close(); err != nil { | |
log.Printf("error closing: %s\n", err.Error()) | |
} | |
} | |
func consume(addrs []string, group, topic string, partition int32, stopChan chan bool) error { | |
client, err := sarama.NewClient(addrs, nil) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(client) | |
mgr, err := sarama.NewOffsetManagerFromClient(group, client) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(mgr) | |
partMgr, err := mgr.ManagePartition(topic, partition) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(partMgr) | |
consumer, err := sarama.NewConsumerFromClient(client) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(consumer) | |
// Finally we get to the actual object we are going to use: a partition | |
// consumer. We fetch the next offset from where we left off from the | |
// partition offset manager, and start consuming the same partition within | |
// the topic, starting from that offset. | |
offset, _ := partMgr.NextOffset() | |
partConsumer, err := consumer.ConsumePartition(topic, partition, offset) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
defer close(partConsumer) | |
log.Println("Consuming messages from offset", offset, "within", topic, "partition", partition) | |
for { | |
select { | |
// Read messages from the consumer channel, and mark each offset after | |
// processing it. This persists that value up to Kafka for your consumer | |
// group, so if the worker is restarted, it can resume where it left off. | |
// | |
// The second argument to the MarkOffset function is `metadata`, which is | |
// an arbitrary (but relatively short) string that your consumer is | |
// supposed to be able to use to reconstruct where it left off. Maybe it | |
// points to a file on disk or something with some persisted state in it. | |
case msg := <-partConsumer.Messages(): | |
log.Printf("Consumed message: %+v", msg) | |
partMgr.MarkOffset(msg.Offset, "") | |
case <-stopChan: | |
log.Println("Consumer stopping") | |
return nil | |
} | |
} | |
return nil | |
} | |
func main() { | |
var brokerAddrsArg string | |
var consumerGroup string | |
var topic string | |
var partition int | |
flag.StringVar(&brokerAddrsArg, "brokers", "127.0.0.1:9092", "Comma separated list of Kafka broker addresses to connect to (e.g. 127.0.0.1:9092,127.0.0.1:9093).") | |
flag.StringVar(&consumerGroup, "consumer-group", "test", "Name for the consumer group.") | |
flag.IntVar(&partition, "partition", 0, "Partition within the topic to consume.") | |
flag.StringVar(&topic, "topic", "test", "Kafka topic to consume.") | |
flag.Parse() | |
brokerAddrs := strings.Split(brokerAddrsArg, ",") | |
stopChan := make(chan bool, 1) | |
go func() { | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
<-c | |
stopChan <- true | |
}() | |
if err := consume(brokerAddrs, consumerGroup, topic, int32(partition), stopChan); err != nil { | |
log.Fatalf("error: %+v\n", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment