Created
February 15, 2017 08:04
-
-
Save nilsmagnus/4b582f9a36279bff5f8f9d453f8fb9c4 to your computer and use it in GitHub Desktop.
Example of go consuming from kafka, using the shopify/sarama library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/Shopify/sarama" | |
"os" | |
"os/signal" | |
"strings" | |
) | |
func main() { | |
config := sarama.NewConfig() | |
config.ClientID = "go-kafka-consumer" | |
config.Consumer.Return.Errors = true | |
brokers := []string{"localhost:9092"} | |
// Create new consumer | |
master, err := sarama.NewConsumer(brokers, config) | |
if err != nil { | |
panic(err) | |
} | |
defer func() { | |
if err := master.Close(); err != nil { | |
panic(err) | |
} | |
}() | |
topics, _ := master.Topics() | |
consumer, errors := consume(topics, master) | |
signals := make(chan os.Signal, 1) | |
signal.Notify(signals, os.Interrupt) | |
// Count how many message processed | |
msgCount := 0 | |
// Get signnal for finish | |
doneCh := make(chan struct{}) | |
go func() { | |
for { | |
select { | |
case msg := <-consumer: | |
msgCount++ | |
fmt.Println("Received messages", string(msg.Key), string(msg.Value)) | |
case consumerError := <-errors: | |
msgCount++ | |
fmt.Println("Received consumerError ", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err) | |
doneCh <- struct{}{} | |
case <-signals: | |
fmt.Println("Interrupt is detected") | |
doneCh <- struct{}{} | |
} | |
} | |
}() | |
<-doneCh | |
fmt.Println("Processed", msgCount, "messages") | |
} | |
func consume(topics []string, master sarama.Consumer) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError) { | |
consumers := make(chan *sarama.ConsumerMessage) | |
errors := make(chan *sarama.ConsumerError) | |
for _, topic := range topics { | |
if strings.Contains(topic, "__consumer_offsets") { | |
continue | |
} | |
partitions, _ := master.Partitions(topic) | |
// this only consumes partition no 1, you would probably want to consume all partitions | |
consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest) | |
if nil != err { | |
fmt.Printf("Topic %v Partitions: %v", topic, partitions) | |
panic(err) | |
} | |
fmt.Println(" Start consuming topic ", topic) | |
go func(topic string, consumer sarama.PartitionConsumer) { | |
for { | |
select { | |
case consumerError := <-consumer.Errors(): | |
errors <- consumerError | |
fmt.Println("consumerError: ", consumerError.Err) | |
case msg := <-consumer.Messages(): | |
consumers <- msg | |
fmt.Println("Got message on topic ", topic, msg.Value) | |
} | |
} | |
}(topic, consumer) | |
} | |
return consumers, errors | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You should use a consumer group https://github.com/Shopify/sarama/tree/master/examples/consumergroup if you're interested in continuing consuming from the previous position.
There are some specific cases for the low-level consumer mentioned on this gist, which might or might not work for you, most people should use the sarama consumer group, to let sarama manage the offsets, claims, consumer group rebalances, etc.
The low-level consumer does not create a consumer group, hence the Kafka server is unaware of the client position.