Last active
January 20, 2023 05:06
-
-
Save ilmsg/d86e1dad01b41239b5362a56e56b5125 to your computer and use it in GitHub Desktop.
kafka with sarama protobuf, producer marshal, consumer unmarshal : https://stackoverflow.com/questions/58842792/how-do-i-send-protobuf-messages-via-a-kafka-producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
pixel "example/pixel" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/Shopify/sarama" | |
"github.com/golang/protobuf/proto" | |
) | |
func main() { | |
topic := "your-topic-name" | |
brokerList := []string{"localhost:29092"} | |
producer, err := newSyncProducer(brokerList) | |
if err != nil { | |
log.Fatalln("Failed to start Sarama producer:", err) | |
} | |
go func() { | |
ticker := time.NewTicker(time.Second) | |
for { | |
select { | |
case t := <-ticker.C: | |
pixelToSend := &pixel.Pixel{SessionId: t.String()} | |
pixelToSendBytes, err := proto.Marshal(pixelToSend) | |
if err != nil { | |
log.Fatalln("Failed to marshal pixel:", err) | |
} | |
msg := &sarama.ProducerMessage{ | |
Topic: topic, | |
Value: sarama.ByteEncoder(pixelToSendBytes), | |
} | |
producer.SendMessage(msg) | |
log.Printf("Pixel sent: %s", pixelToSend) | |
} | |
} | |
}() | |
signals := make(chan os.Signal, 1) | |
signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) | |
partitionConsumer, err := newPartitionConsumer(brokerList, topic) | |
if err != nil { | |
log.Fatalln("Failed to create Sarama partition consumer:", err) | |
} | |
log.Println("Waiting for messages...") | |
for { | |
select { | |
case msg := <-partitionConsumer.Messages(): | |
receivedPixel := &pixel.Pixel{} | |
err := proto.Unmarshal(msg.Value, receivedPixel) | |
if err != nil { | |
log.Fatalln("Failed to unmarshal pixel:", err) | |
} | |
log.Printf("Pixel received: %s", receivedPixel) | |
case <-signals: | |
log.Print("Received termination signal. Exiting.") | |
return | |
} | |
} | |
} | |
func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) { | |
config := sarama.NewConfig() | |
config.Producer.Return.Successes = true | |
// TODO configure producer | |
producer, err := sarama.NewSyncProducer(brokerList, config) | |
if err != nil { | |
return nil, err | |
} | |
return producer, nil | |
} | |
func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) { | |
conf := sarama.NewConfig() | |
// TODO configure consumer | |
consumer, err := sarama.NewConsumer(brokerList, conf) | |
if err != nil { | |
return nil, err | |
} | |
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) | |
if err != nil { | |
return nil, err | |
} | |
return partitionConsumer, err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment