Skip to content

Instantly share code, notes, and snippets.

@markuskont
Last active November 2, 2017 13:43
Show Gist options
  • Save markuskont/502c1babc79baf3b4b4aabf17ef53c72 to your computer and use it in GitHub Desktop.
Save markuskont/502c1babc79baf3b4b4aabf17ef53c72 to your computer and use it in GitHub Desktop.
Basic code to get started with apache Kafka (tested on 1.0.0) and go 1.9 sarama library
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"strings"
"github.com/Shopify/sarama"
"github.com/linkedin/goavro"
)
func loadSchema(source string) (str string) {
b, err := ioutil.ReadFile(source) // just pass the file name
if err != nil {
log.Fatal(err)
}
str = string(b)
return
}
func main() {
var (
kafka = flag.String("brokers", "localhost:9092", "Comma-separated list of brokers")
topic = flag.String("topic", "test", "Kafka topic to use")
partition = flag.Uint("partition", 0, "Integer for test message consumption")
consume = flag.Bool("consume", false, "Test consumption of messages from defined topic, needs -topic")
schema = flag.String("schema", "/tmp/schema.json", "Avro schema for encoding/decoding")
)
flag.Parse()
// generic config
conf := sarama.NewConfig()
conf.Producer.Retry.Max = 5
conf.Producer.RequiredAcks = sarama.WaitForAll
//config.Producer.MaxMessageBytes = 16 << 20 // 16MB
conf.Producer.Return.Successes = true
// talking to brokers
addrs := strings.Split(*kafka, ",")
client, _ := sarama.NewClient(addrs, conf)
brokers := client.Brokers()
fmt.Println(brokers)
topics, _ := client.Topics()
for _, t := range topics {
partitions, _ := client.Partitions(t)
fmt.Println(t)
for _, p := range partitions {
off, _ := client.GetOffset(t, p, -1)
fmt.Println("offset:", p, ":", off)
}
}
if *consume == true {
consumer, _ := sarama.NewConsumer(addrs, conf)
stream, _ := consumer.ConsumePartition(*topic, int32(*partition), -1)
// avro codec stuff
s := loadSchema(*schema)
c, err := goavro.NewCodec(s)
if err != nil {
log.Fatal(err)
}
for msg := range stream.Messages() {
//o := msg.Offset
raw, _, err := c.NativeFromBinary(msg.Value)
data := raw.(map[string]interface{})
if err != nil {
log.Fatal(err)
}
fmt.Println("-----")
fmt.Println(msg.Offset)
fmt.Println(raw)
fmt.Println(data)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment