Skip to content

Instantly share code, notes, and snippets.

@Abdulsametileri
Created November 16, 2022 21:43
Show Gist options
  • Save Abdulsametileri/92eb87c073de0335fce148b2cd846623 to your computer and use it in GitHub Desktop.
Save Abdulsametileri/92eb87c073de0335fce148b2cd846623 to your computer and use it in GitHub Desktop.
func (s *IntegrationTestSuite) Test_Should_Consume_Successfully() {
// Given
cfg := &kafka.Config{
Brokers: s.wrapper.GetBrokerAddresses(),
Consumer: kafka.ConsumerConfig{
GroupID: "consumer-group-1",
Topic: "test-consume",
},
}
producer := kafka.NewProducer(cfg)
consumer := kafka.NewConsumer(cfg)
expectedMessage := kafka.Message{Key: nil, Value: []byte(`{ "say": "hello" }`), Topic: cfg.Consumer.Topic}
err := producer.Produce(context.Background(), expectedMessage)
if err != nil {
s.T().Fatalf("could not produce example message %s", err)
}
// When
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
actualMessage, err := consumer.Consume(ctx)
// Then
assert.Nil(s.T(), err)
assert.Equal(s.T(), expectedMessage, actualMessage)
}
func (s *IntegrationTestSuite) Test_Should_Produce_Successfully() {
// Given
cfg := &kafka.Config{
Brokers: s.wrapper.GetBrokerAddresses(),
Consumer: kafka.ConsumerConfig{
GroupID: "consumer-group-2",
Topic: "test-produce",
},
}
producer := kafka.NewProducer(cfg)
expectedMessage := kafka.Message{Key: nil, Value: []byte(`{ "say": "hello" }`), Topic: cfg.Consumer.Topic}
// When
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
err := producer.Produce(ctx, expectedMessage)
// Then
assert.Nil(s.T(), err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment