Created
June 7, 2019 04:35
-
-
Save devplayg/3a413989b80ea83db6aa076371357c85 to your computer and use it in GitHub Desktop.
sarama/examples/consumergroup/main.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"strings" | |
"syscall" | |
"github.com/Shopify/sarama" | |
) | |
// Sarma configuration options | |
var ( | |
brokers = "" | |
version = "" | |
group = "" | |
topics = "" | |
oldest = true | |
verbose = false | |
) | |
func init() { | |
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list") | |
flag.StringVar(&group, "group", "", "Kafka consumer group definition") | |
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version") | |
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list") | |
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial ofset from oldest") | |
flag.BoolVar(&verbose, "verbose", false, "Sarama logging") | |
flag.Parse() | |
if len(brokers) == 0 { | |
panic("no Kafka bootstrap brokers defined, please set the -brokers flag") | |
} | |
if len(topics) == 0 { | |
panic("no topics given to be consumed, please set the -topics flag") | |
} | |
if len(group) == 0 { | |
panic("no Kafka consumer group defined, please set the -group flag") | |
} | |
} | |
func main() { | |
log.Println("Starting a new Sarama consumer") | |
if verbose { | |
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) | |
} | |
version, err := sarama.ParseKafkaVersion(version) | |
if err != nil { | |
panic(err) | |
} | |
/** | |
* Construct a new Sarama configuration. | |
* The Kafka cluster version has to be defined before the consumer/producer is initialized. | |
*/ | |
config := sarama.NewConfig() | |
config.Version = version | |
if oldest { | |
config.Consumer.Offsets.Initial = sarama.OffsetOldest | |
} | |
/** | |
* Setup a new Sarama consumer group | |
*/ | |
consumer := Consumer{} | |
ctx := context.Background() | |
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) | |
if err != nil { | |
panic(err) | |
} | |
go func() { | |
for { | |
consumer.ready = make(chan bool, 0) | |
err := client.Consume(ctx, strings.Split(topics, ","), &consumer) | |
if err != nil { | |
panic(err) | |
} | |
} | |
}() | |
<-consumer.ready // Await till the consumer has been set up | |
log.Println("Sarama consumer up and running!...") | |
sigterm := make(chan os.Signal, 1) | |
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) | |
<-sigterm // Await a sigterm signal before safely closing the consumer | |
err = client.Close() | |
if err != nil { | |
panic(err) | |
} | |
} | |
// Consumer represents a Sarama consumer group consumer | |
type Consumer struct { | |
ready chan bool | |
} | |
// Setup is run at the beginning of a new session, before ConsumeClaim | |
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { | |
// Mark the consumer as ready | |
close(consumer.ready) | |
return nil | |
} | |
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited | |
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { | |
return nil | |
} | |
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). | |
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
// NOTE: | |
// Do not move the code below to a goroutine. | |
// The `ConsumeClaim` itself is called within a goroutine, see: | |
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 | |
for message := range claim.Messages() { | |
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) | |
session.MarkMessage(message, "") | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment