Last active
October 26, 2024 15:50
-
-
Save acastro2/8ad546ccff0c3e82aa5b5e867c086c80 to your computer and use it in GitHub Desktop.
Kafka retries with Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/tls" | |
"encoding/json" | |
"fmt" | |
"math/rand" | |
"time" | |
"github.com/acastro2/go-retries/kafka_retry_dlq" | |
"github.com/cenkalti/backoff/v4" | |
"github.com/segmentio/kafka-go" | |
"github.com/segmentio/kafka-go/sasl/plain" | |
) | |
type exampleHandler struct { | |
} | |
func (h *exampleHandler) Process(ctx context.Context, msg kafka.Message) error { | |
// Parsing kafka message as json and log it to console | |
var jsonData map[string]interface{} | |
err := json.Unmarshal(msg.Value, &jsonData) | |
fmt.Println(jsonData) | |
// Return error if message processing failed | |
if rand.Intn(100) < 25 { | |
return fmt.Errorf("Throw random error to simulate processing failure and test retry.") | |
} | |
return err | |
} | |
func (h *exampleHandler) MoveToDLQ(ctx context.Context, msg kafka.Message) { | |
// Implement logic to move message to DLQ | |
fmt.Println("Moving message to DLQ") | |
} | |
func main() { | |
ctx := context.Background() | |
readerConfig := kafka.ReaderConfig{ | |
Brokers: []string{"localhost:9092"}, | |
GroupID: "<consumer-group-id>", | |
Topic: "<topic>", | |
MaxBytes: 10e6, // 10MB | |
WatchPartitionChanges: true, | |
Dialer: &kafka.Dialer{ | |
SASLMechanism: plain.Mechanism{ | |
Username: "<username>", | |
Password: "<password>", | |
}, | |
TLS: &tls.Config{ | |
MinVersion: tls.VersionTLS12, | |
}, | |
}, | |
} | |
reader := kafka.NewReader(readerConfig) | |
defer reader.Close() | |
backoff := backoff.NewExponentialBackOff() | |
backoff.MaxElapsedTime = time.Minute * 5 | |
options := kafka_retry_dlq.ConsumerWithRetryOptions{ | |
Reader: reader, | |
Backoff: backoff, | |
MaxRetries: 3, | |
Handler: &exampleHandler{}, | |
RetryQueue: make(chan kafka.Message, 1000), | |
} | |
kafka_retry_dlq.NewConsumerWithRetry(ctx, &options) | |
// Wait for messages to be processed | |
select {} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka_retry_dlq | |
import ( | |
"context" | |
"fmt" | |
"time" | |
"github.com/cenkalti/backoff/v4" | |
"github.com/segmentio/kafka-go" | |
) | |
type ProcessRetryHandler interface { | |
Process(context.Context, kafka.Message) error | |
MoveToDLQ(context.Context, kafka.Message) | |
} | |
type ConsumerWithRetryOptions struct { | |
Handler ProcessRetryHandler | |
Reader *kafka.Reader | |
MaxRetries int | |
RetryQueue chan kafka.Message | |
Backoff backoff.BackOff | |
} | |
// NewConsumerWithRetry creates a new kafka consumer with retry mechanism | |
func NewConsumerWithRetry(ctx context.Context, options *ConsumerWithRetryOptions) { | |
go func() { | |
for { | |
select { | |
case msg := <-options.RetryQueue: | |
retries := 1 | |
for { | |
fmt.Printf("Retry %v message %v\n", retries, msg.Key) | |
if retries >= options.MaxRetries { | |
options.Handler.MoveToDLQ(ctx, msg) | |
break | |
} | |
if err := options.Handler.Process(ctx, msg); err != nil { | |
fmt.Printf("Error processing message, retrying: %v\n", err) | |
time.Sleep(options.Backoff.NextBackOff()) | |
retries++ | |
continue | |
} | |
break | |
} | |
} | |
} | |
}() | |
for { | |
msg, err := options.Reader.ReadMessage(ctx) | |
if err != nil { | |
fmt.Println("Error reading message: ", err) | |
return | |
} | |
if err := options.Handler.Process(ctx, msg); err != nil { | |
options.RetryQueue <- msg | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nice example!