Skip to content

Instantly share code, notes, and snippets.

@tappoz
Last active November 27, 2017 16:27
Show Gist options
  • Save tappoz/6c25caa4389359750634c7c8d77a6632 to your computer and use it in GitHub Desktop.
Save tappoz/6c25caa4389359750634c7c8d77a6632 to your computer and use it in GitHub Desktop.
Apache Kafka and Golang

Trubleshooting Kafka

  • To consume existing topics: ./bin/kafka-console-consumer.sh --bootstrap-server <IP_ADDR>:9092 --topic my-custom-topic --from-beginning
  • To check a list of topics: ./bin/kafka-topics.sh --list --zookeeper <IP_ADDR>:2181

Go sample

The following code snippets make use of sarama the Go library to connect to a cluster of Apache Kafka instance.

A producer could be used in your main application via the "Kafka topic manager" like:

import (
  ".../<PATH>/.../services"
  "log"
  "os"
  "github.com/Shopify/sarama"
  "strings"
)

func main() {
  // ...
  sarama.Logger = log.New(os.Stdout, "[My App] ", log.LstdFlags)
  // ...
  kafkaBrokers := strings.Split("<IP_1>:9092,<IP_2>:9092,<IP_3>:9092", ",")
	kafkaTopic := "my-topic-label"
  // ...
  kafkaTopicManager := NewKafkaManager(kafkaBrokers, kafkaTopic)
}

The following is a Producer for messages pushed to Apache Kafka.

package services

import (
	"github.com/Shopify/sarama"
	"os"
	"fmt"
)

type KafkaTopicManager interface {
	ProduceMsg(event string) error
	Close()
}

type kafkaTopicManager struct {
	brokers []string
	topic string
	producer sarama.SyncProducer
	// TODO the consumer (if needed)
}

func NewKafkaManager(brokers []string, topic string) KafkaTopicManager {
	return &kafkaTopicManager{
		brokers: brokers,
		topic: topic,
		producer: newKafkaSyncProducer(brokers),
	}
}

// https://semaphoreci.com/community/tutorials/writing-and-testing-an-event-sourcing-microservice-with-kafka-and-go
func newKafkaSyncProducer(brokers []string) sarama.SyncProducer {
	producerConf := sarama.NewConfig()
	producerConf.Producer.RequiredAcks = sarama.WaitForAll
	producerConf.Producer.Return.Successes = true
	producerConf.Producer.Return.Errors = true // https://godoc.org/github.com/Shopify/sarama#SyncProducer
	producerConf.ChannelBufferSize = 1
	producerConf.Version = sarama.V0_10_2_0
	//producerConf.Producer.Partitioner = sarama.NewRandomPartitioner // https://medium.com/@Oskarr3/implementing-cqrs-using-kafka-and-sarama-library-in-golang-da7efa3b77fe

	fmt.Printf("Ready to establish the connection to these Kafka brokers: %v\n", brokers)
	// https://godoc.org/github.com/Shopify/sarama#SyncProducer
	// SyncProducer publishes Kafka messages, blocking until they have been acknowledged.
	kafkaProducer, err := sarama.NewSyncProducer(brokers, producerConf)
	if err != nil {
		fmt.Printf("Kafka error: %s", err)
		os.Exit(-1)
	}

	return kafkaProducer
}

func (k * kafkaTopicManager) Close() {
	fmt.Println("Entered in the deferred closing invocation for the kafka producer")
	if err := k.producer.Close(); err != nil {
		fmt.Printf("Error while closing the connection for the Kafka producer: %v", err)
	}
}

func (k * kafkaTopicManager) ProduceMsg(event string) error {
	// INPUT: event interface{}
	// json, err := json.Marshal(event)
	// if err != nil {
	//     return err
	// }
	// >>> string(json)

	msgLog := &sarama.ProducerMessage{
		Topic: k.topic,
		Value: sarama.StringEncoder(event),
	}

	// ignoring this output params: partition, offset
	_, _, err := k.producer.SendMessage(msgLog)
	if err != nil {
		fmt.Printf("Kafka error: %s\n", err)
		return err
	}

	return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment