Created
December 31, 2017 16:53
-
-
Save savaki/f5dadb594d6cf72e1b54265e4e75e9e3 to your computer and use it in GitHub Desktop.
performance of kafka-go consumer group reader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/tls" | |
"crypto/x509" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"strings" | |
"time" | |
"github.com/segmentio/kafka-go" | |
"github.com/segmentio/ksuid" | |
) | |
var ( | |
brokers string | |
topic string | |
records int | |
certFile string | |
caFile string | |
keyFile string | |
) | |
func init() { | |
flag.StringVar(&brokers, "brokers", "localhost:9092", "broker addresses, comma-separated") | |
flag.StringVar(&topic, "topic", "topic", "topic to produce to") | |
flag.IntVar(&records, "records", 250000, "number of records to read from kafka") | |
flag.StringVar(&certFile, "cert", "_cert.pem", "tls cert") | |
flag.StringVar(&caFile, "ca", "_ca.pem", "tls ca") | |
flag.StringVar(&keyFile, "key", "_key.pem", "tls key") | |
flag.Parse() | |
} | |
func check(err error) { | |
if err != nil { | |
log.Fatalln(err) | |
} | |
} | |
func tlsConfig() *tls.Config { | |
certPEM, err := ioutil.ReadFile(certFile) | |
check(err) | |
caPEM, err := ioutil.ReadFile(caFile) | |
check(err) | |
keyPEM, err := ioutil.ReadFile(keyFile) | |
check(err) | |
if certPEM == nil || caPEM == nil || keyPEM == nil { | |
panic("tls configuration not available") | |
} | |
cert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM)) | |
check(err) | |
caCertPool := x509.NewCertPool() | |
caCertPool.AppendCertsFromPEM([]byte(caPEM)) | |
return &tls.Config{ | |
Certificates: []tls.Certificate{cert}, | |
RootCAs: caCertPool, | |
InsecureSkipVerify: true, | |
} | |
} | |
func main() { | |
groupID := ksuid.New().String() | |
r := kafka.NewReader(kafka.ReaderConfig{ | |
Brokers: strings.Split(brokers, ","), | |
Topic: topic, | |
GroupID: groupID, | |
Dialer: &kafka.Dialer{TLS: tlsConfig()}, | |
}) | |
defer r.Close() | |
ctx := context.Background() | |
var start time.Time | |
var count int | |
for { | |
_, err := r.ReadMessage(ctx) | |
check(err) | |
count++ | |
if count == 1 { | |
start = time.Now() | |
} | |
if count == records { | |
break | |
} | |
} | |
elapsed := time.Now().Sub(start) | |
fmt.Printf("kafka-go: %v records, %v\n", records, elapsed) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
On my desktop to a remote kafka 0.11.2, I get