Created
February 24, 2022 14:51
-
-
Save TonPC64/8ad671c8aa7e7fe0224f78e2503c6e98 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func printMessage(msg *sarama.ConsumerMessage) { | |
// Extract tracing info from message | |
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg)) | |
tr := otel.GetTracerProvider().Tracer("consumer") | |
_, span := tr.Start(ctx, "consume message", trace.WithAttributes( | |
semconv.MessagingOperationProcess, | |
)) | |
defer span.End() | |
log.Println("Successful to read message: ", string(msg.Value)) | |
} | |
// Consumer represents a Sarama consumer group consumer | |
type Consumer struct { | |
} | |
// Setup is run at the beginning of a new session, before ConsumeClaim | |
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { | |
return nil | |
} | |
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited | |
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { | |
return nil | |
} | |
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). | |
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
// NOTE: | |
// Do not move the code below to a goroutine. | |
// The `ConsumeClaim` itself is called within a goroutine, see: | |
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 | |
for message := range claim.Messages() { | |
printMessage(message) | |
session.MarkMessage(message, "") | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment