Skip to content

Instantly share code, notes, and snippets.

@taion809
Created June 15, 2017 20:22
Show Gist options
  • Save taion809/2d7c1bc63e37902b23168dea3602f37c to your computer and use it in GitHub Desktop.
Save taion809/2d7c1bc63e37902b23168dea3602f37c to your computer and use it in GitHub Desktop.
package transceiver
import (
"context"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type Tx interface {
Send(context.Context, []byte) error
}
type DefaultTx struct {
Producer *kafka.Producer
Topic string
}
type TxMessage struct {
Body []byte
TopicPartition *kafka.TopicPartition
}
var DefaultTopicPartition = kafka.TopicPartition{Partition: kafka.PartitionAny}
func NewDefaultTx(servers []string, topic string, additionalConfig *kafka.ConfigMap) (*DefaultTx, error) {
if additionalConfig == nil {
additionalConfig = &kafka.ConfigMap{}
}
additionalConfig.SetKey("bootstrap.servers", strings.Join(servers, ","))
producer, err := kafka.NewProducer(additionalConfig)
if err != nil {
return nil, err
}
return &DefaultTx{Producer: producer, Topic: topic}, nil
}
func (t *DefaultTx) Close() {
if t == nil || t.Producer == nil {
return
}
t.Producer.Close()
}
func (t *DefaultTx) Send(ctx context.Context, msg *TxMessage) error {
tp := msg.TopicPartition
if tp == nil {
tp = &DefaultTopicPartition
tp.Topic = &t.Topic
}
message := &kafka.Message{
Value: msg.Body,
TopicPartition: *tp,
}
select {
case t.Producer.ProduceChannel() <- message:
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment