- To consume existing topics:
./bin/kafka-console-consumer.sh --bootstrap-server <IP_ADDR>:9092 --topic my-custom-topic --from-beginning - To check a list of topics:
./bin/kafka-topics.sh --list --zookeeper <IP_ADDR>:2181
The following code snippets make use of sarama the Go library to connect to a cluster of Apache Kafka instance.
A producer could be used in your main application via the "Kafka topic manager" like:
import (
".../<PATH>/.../services"
"log"
"os"
"github.com/Shopify/sarama"
"strings"
)
func main() {
// ...
sarama.Logger = log.New(os.Stdout, "[My App] ", log.LstdFlags)
// ...
kafkaBrokers := strings.Split("<IP_1>:9092,<IP_2>:9092,<IP_3>:9092", ",")
kafkaTopic := "my-topic-label"
// ...
kafkaTopicManager := NewKafkaManager(kafkaBrokers, kafkaTopic)
}The following is a Producer for messages pushed to Apache Kafka.
package services
import (
"github.com/Shopify/sarama"
"os"
"fmt"
)
type KafkaTopicManager interface {
ProduceMsg(event string) error
Close()
}
type kafkaTopicManager struct {
brokers []string
topic string
producer sarama.SyncProducer
// TODO the consumer (if needed)
}
func NewKafkaManager(brokers []string, topic string) KafkaTopicManager {
return &kafkaTopicManager{
brokers: brokers,
topic: topic,
producer: newKafkaSyncProducer(brokers),
}
}
// https://semaphoreci.com/community/tutorials/writing-and-testing-an-event-sourcing-microservice-with-kafka-and-go
func newKafkaSyncProducer(brokers []string) sarama.SyncProducer {
producerConf := sarama.NewConfig()
producerConf.Producer.RequiredAcks = sarama.WaitForAll
producerConf.Producer.Return.Successes = true
producerConf.Producer.Return.Errors = true // https://godoc.org/github.com/Shopify/sarama#SyncProducer
producerConf.ChannelBufferSize = 1
producerConf.Version = sarama.V0_10_2_0
//producerConf.Producer.Partitioner = sarama.NewRandomPartitioner // https://medium.com/@Oskarr3/implementing-cqrs-using-kafka-and-sarama-library-in-golang-da7efa3b77fe
fmt.Printf("Ready to establish the connection to these Kafka brokers: %v\n", brokers)
// https://godoc.org/github.com/Shopify/sarama#SyncProducer
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged.
kafkaProducer, err := sarama.NewSyncProducer(brokers, producerConf)
if err != nil {
fmt.Printf("Kafka error: %s", err)
os.Exit(-1)
}
return kafkaProducer
}
func (k * kafkaTopicManager) Close() {
fmt.Println("Entered in the deferred closing invocation for the kafka producer")
if err := k.producer.Close(); err != nil {
fmt.Printf("Error while closing the connection for the Kafka producer: %v", err)
}
}
func (k * kafkaTopicManager) ProduceMsg(event string) error {
// INPUT: event interface{}
// json, err := json.Marshal(event)
// if err != nil {
// return err
// }
// >>> string(json)
msgLog := &sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.StringEncoder(event),
}
// ignoring this output params: partition, offset
_, _, err := k.producer.SendMessage(msgLog)
if err != nil {
fmt.Printf("Kafka error: %s\n", err)
return err
}
return nil
}