Skip to content

Instantly share code, notes, and snippets.

@faelp22
Created July 25, 2024 12:31
Show Gist options
  • Save faelp22/23784cf928b3958e08532e69e690927b to your computer and use it in GitHub Desktop.
Save faelp22/23784cf928b3958e08532e69e690927b to your computer and use it in GitHub Desktop.
Simples Golang Kafka Consumer
package main
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// Gera uma string aleatória de n caracteres
func randomString(n int) (string, error) {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes)[:n], nil
}
func main() {
// Gera um nome aleatório de 8 caracteres para o cliente consumidor
clientID, err := randomString(8)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Nome do cliente consumidor: %s\n", clientID)
// Configurações do leitor Kafka
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"}, // Endereço do broker Kafka
GroupID: "my-group", // ID fixo do grupo de consumidores
Topic: "my-topic-2", // Nome do tópico
MinBytes: 10e3, // Tamanho mínimo do lote de leitura
MaxBytes: 10e6, // Tamanho máximo do lote de leitura
Dialer: &kafka.Dialer{
ClientID: clientID, // Define o ClientID com o nome aleatório
},
})
defer r.Close()
ctx := context.Background()
for {
// Lê a mensagem do Kafka
m, err := r.ReadMessage(ctx)
if err != nil {
log.Printf("Erro ao ler mensagem: %v\n", err)
continue
}
// Imprime a mensagem na tela
fmt.Printf("Nova Mensagem: clientID = %s, chave = %s, valor = %s, offset = %d\n", clientID, string(m.Key), string(m.Value), m.Offset)
// Aguarda um tempo antes de ler a próxima mensagem
time.Sleep(1 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment