Last active
January 13, 2022 19:32
-
-
Save marselester/a7cc16279e990a8d42c41fd9f557b341 to your computer and use it in GitHub Desktop.
Reproduce EOF error in kafka-go v0.4.25 #814
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
If the topic has no messages, the kafka reader starts to report | |
"the kafka reader got an unknown error reading partition x of my-topic at offset y: unexpected EOF", | |
see https://github.com/segmentio/kafka-go/issues/814. | |
$ go run main.go | |
2022/01/13 14:10:09 read message: fizz | |
2022/01/13 14:10:27 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF | |
^C | |
2022/01/13 14:10:30 failed to read a message: context canceled | |
2022/01/13 14:10:36 the kafka reader got an unknown error reading partition 0 of mytopic at offset 1: unexpected EOF | |
The error occurs at https://github.com/segmentio/kafka-go/blob/v0.4.25/reader.go#L1380 line. | |
*/ | |
package main | |
import ( | |
"context" | |
"log" | |
"os/signal" | |
"syscall" | |
"github.com/segmentio/kafka-go" | |
) | |
func main() { | |
var ( | |
brokers = []string{"127.0.0.1:9092"} | |
topic = "mytopic" | |
) | |
producer := kafka.NewWriter(kafka.WriterConfig{ | |
Brokers: brokers, | |
Topic: topic, | |
}) | |
defer func() { | |
if err := producer.Close(); err != nil { | |
log.Printf("failed to close producer: %v", err) | |
} | |
}() | |
consumer := kafka.NewReader(kafka.ReaderConfig{ | |
Brokers: brokers, | |
Topic: topic, | |
Partition: 0, | |
ErrorLogger: log.Default(), | |
}) | |
defer func() { | |
if err := consumer.Close(); err != nil { | |
log.Printf("failed to close consumer: %v", err) | |
} | |
}() | |
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) | |
defer cancel() | |
if err := producer.WriteMessages(ctx, kafka.Message{Value: []byte("fizz")}); err != nil { | |
log.Printf("failed to write a message: %v", err) | |
return | |
} | |
for { | |
m, err := consumer.ReadMessage(ctx) | |
if err != nil { | |
log.Printf("failed to read a message: %v", err) | |
return | |
} | |
log.Printf("read message: %s", m.Value) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
docker-compose.yml
You can run Kafka in Docker.