Last active
December 5, 2019 18:52
-
-
Save zgiber/0c27f6d18f378d2fe27b7a764538d589 to your computer and use it in GitHub Desktop.
Confluent Kafka Go package - unexpected behaviour (?)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
) | |
const ( | |
mb = 1024 * 1024 | |
) | |
var ( | |
// assuming a setup of 3 brokers: | |
topicPartitions = 5 | |
topicReplicationFactor = 3 | |
topic = "test-topic-1" | |
broker = "localhost:9092,localhost:9093,localhost:9094" | |
group = "test-group" | |
) | |
func main() { | |
// publish N messages | |
messageCount := 30 | |
// clean(ish) start | |
createTopic() | |
waitForTopic(topic) | |
drainTopic(topic) | |
// produce them | |
produce(messageCount) | |
ctx := withSignal(context.Background()) | |
err := consume(ctx) | |
if err != nil { | |
log.Println(err) | |
} | |
} | |
type proc struct { | |
id int | |
errors int | |
} | |
func withSignal(ctx context.Context) context.Context { | |
ctx, cf := context.WithCancel(ctx) | |
go func() { | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
<-sigchan | |
cf() | |
}() | |
return ctx | |
} | |
// mostly copied from the confluent example | |
func consume(ctx context.Context) error { | |
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": "localhost:9092", | |
"group.id": group, | |
"auto.offset.reset": "earliest", | |
"enable.auto.commit": true, | |
"enable.auto.offset.store": false, | |
"log.connection.close": false, | |
"max.partition.fetch.bytes": int(mb * 5), | |
}) | |
if err != nil { | |
return err | |
} | |
seenOffsets := map[int64]int{} | |
err = consumer.SubscribeTopics([]string{topic}, nil) | |
if err != nil { | |
return err | |
} | |
fmt.Println("subscribed to", topic) | |
for i := 0; ; i++ { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
default: | |
ev := consumer.Poll(1000) | |
if ev == nil { | |
continue | |
} | |
switch e := ev.(type) { | |
case *kafka.Message: | |
tp := e.TopicPartition | |
fmt.Printf("<- %d\n", tp.Offset) | |
if count := seenOffsets[int64(tp.Offset)]; count > 0 { | |
fmt.Printf("duplicated message (%v) at offset %d (x%d)\n", i, tp.Offset, count) | |
} | |
seenOffsets[int64(tp.Offset)]++ | |
// after a number of successful messages let's try to trigger the behaviour and see what we get | |
if i == 15 { | |
if seekErr := consumer.Seek(tp, 100); seekErr != nil { | |
return seekErr | |
} | |
time.Sleep(1 * time.Second) | |
tp.Offset++ | |
_, err := consumer.StoreOffsets([]kafka.TopicPartition{tp}) | |
if err != nil { | |
return err | |
} | |
} | |
// fmt.Printf("%% Message on %s:\n%s\n", | |
// e.TopicPartition, string(e.Value)) | |
// if e.Headers != nil { | |
// fmt.Printf("%% Headers: %v\n", e.Headers) | |
// } | |
case kafka.Error: | |
// Errors should generally be considered | |
// informational, the client will try to | |
// automatically recover. | |
// But in this example we choose to terminate | |
// the application if all brokers are down. | |
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) | |
if e.Code() == kafka.ErrAllBrokersDown { | |
return e | |
} | |
default: | |
fmt.Printf("Ignored %v\n", e) | |
} | |
} | |
} | |
} | |
func produce(count int) { | |
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// flush requires delivery channel to be used otherwise it won't work (weird?) | |
delivery := make(chan kafka.Event, 1) | |
go func() { | |
for i := 0; i < count; i++ { | |
value := []byte(fmt.Sprint(i)) | |
err = p.Produce(&kafka.Message{ | |
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, | |
Value: value, | |
}, delivery) | |
} | |
}() | |
for i := 0; i < count; i++ { | |
<-delivery | |
} | |
waitForMs := 10000 | |
p.Flush(waitForMs) // this does not work without delivery... BUG? | |
} | |
func createTopic() { | |
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
defer a.Close() | |
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
defer cancel() | |
result, err := a.CreateTopics( | |
ctx, | |
[]kafka.TopicSpecification{{ | |
Topic: topic, | |
NumPartitions: topicPartitions, | |
ReplicationFactor: topicReplicationFactor, | |
}}, | |
kafka.SetAdminOperationTimeout(30*time.Second)) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for _, res := range result { | |
fmt.Println(res) | |
} | |
waitForTopic(topic) | |
} | |
func waitForTopic(topic string) { | |
ac, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for ok := false; ok == false; ok = topicExists(topic, ac, 1*time.Second) { | |
} | |
} | |
func topicExists(topic string, ac *kafka.AdminClient, timeout time.Duration) bool { | |
timeoutMs := int(timeout / time.Millisecond) | |
md, err := ac.GetMetadata(&topic, false, timeoutMs) | |
if err != nil { | |
log.Println(err) | |
return false | |
} | |
// this always returns true... have to check later | |
_, exists := md.Topics[topic] | |
return exists | |
} | |
// consumes all messages for the topic/consumer group | |
// subsequent consumers with the same group will not | |
// see these messages | |
func drainTopic(topic string) { | |
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// get metadata by the admin client (gives topics, and partition ID for the topics) | |
md, err := a.GetMetadata(&topic, false, 10000) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker, "group.id": group}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
defer c.Close() | |
// construct kafka.TopicPartition from the partition metadata | |
partitions := []kafka.TopicPartition{} | |
for topic, metadata := range md.Topics { | |
for _, partition := range metadata.Partitions { | |
// Note: unfortunately the high watermark is not the offset of the last message produced... | |
// This makes it difficult to obtain the latest offset in a dynamic environment. | |
// Instead the consumer can use this as the starting point for draining messages so | |
// it does not have to discard all messages ever produced. | |
_, hw, err := c.QueryWatermarkOffsets(topic, partition.ID, 10000) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// partitions to be assigned to the consumer in the next step | |
partitions = append(partitions, kafka.TopicPartition{ | |
Topic: &topic, | |
Partition: partition.ID, | |
Offset: kafka.Offset(hw), | |
}) | |
} | |
} | |
// try to assign the partitions to this consumer | |
// (would this steal them from other consumers? Might help SAT tests.) | |
err = c.Assign(partitions) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// consume until there is nothing to receive for 200ms | |
for ; err == nil; _, err = c.ReadMessage(200 * time.Millisecond) { | |
} | |
committedPartitions, err := c.CommitOffsets(partitions) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
// leave nicely | |
err = c.Unassign() | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
log.Println("=== Committed partitions:") | |
var offsetTotal int64 | |
for _, p := range committedPartitions { | |
log.Println(p.Partition, p.Offset) | |
offsetTotal += int64(p.Offset) | |
} | |
fmt.Println(offsetTotal) // should be visible if it the diff between the prev run does not equal the total emitted messages | |
} | |
// don't bother, doesn't work (at least not with the versions we use)... | |
func deleteTopic() { | |
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
defer cancel() | |
result, err := a.DeleteTopics(ctx, []string{topic}, kafka.SetAdminOperationTimeout(30*time.Second)) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for { | |
var t string = topic | |
log.Println("get metadata") | |
metadata, err := a.GetMetadata(&t, false, 10000) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
if len(metadata.Topics) == 0 { | |
break | |
} | |
log.Printf("got metadata for %v partition", len(metadata.Topics)) | |
for topic, md := range metadata.Topics { | |
log.Println(topic) | |
for _, p := range md.Partitions { | |
log.Println(topic, p.ID, p.Error, p.Leader, p.Replicas, p.Isrs) | |
} | |
time.Sleep(1 * time.Second) | |
} | |
} | |
for _, res := range result { | |
fmt.Println(res) | |
} | |
} | |
func publishTestMessages() error { | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment