Last active
January 23, 2025 17:30
Spring Boot: Testing a @SqsListener with TestContainers and LocalStack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.app2.xxx.samples.aws.sqs; | |
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.Customer; | |
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.CustomerRepository; | |
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy; | |
import io.awspring.cloud.messaging.listener.annotation.SqsListener; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | |
import org.springframework.messaging.handler.annotation.Header; | |
import org.springframework.messaging.handler.annotation.Headers; | |
import org.springframework.messaging.handler.annotation.MessageExceptionHandler; | |
import org.springframework.messaging.handler.annotation.Payload; | |
import org.springframework.stereotype.Component; | |
import javax.validation.ConstraintViolationException; | |
import java.util.Map; | |
/** | |
* Simple condition to avoid initializing this listener when running integration tests that don't | |
* care about it. | |
* | |
* Unfortunately, this is necessary because Spring Cloud tries to resolve the @SqsListener's queue URL | |
* on startup, and if there's no SQS server up and running it crashes the application. | |
*/ | |
@ConditionalOnProperty( | |
name = "cloud.aws.sqs.listener.auto-startup", havingValue = "true" | |
) | |
@Component | |
public class CustomerCreatedEventSqsConsumer { | |
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerCreatedEventSqsConsumer.class); | |
private final CustomerRepository repository; | |
public CustomerCreatedEventSqsConsumer(CustomerRepository repository) { | |
this.repository = repository; | |
} | |
@SqsListener( | |
value = "${samples.aws.sqs.consumer-queue}", | |
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS | |
) | |
public void receive(CustomerCreatedEvent event, @Header("MessageId") String messageId) { | |
LOGGER.info( | |
"Receiving a CustomerCreatedEvent (MessageId=\"{}\") from SQS queue: {}", | |
messageId, event | |
); | |
// converts to domain model and invokes the business logic | |
Customer customer = event.toModel(); | |
repository.save(customer); | |
} | |
/** | |
* This is how we can handle errors with @SqsListener, and as you can see, it's very | |
* similar to Controller Advices | |
*/ | |
@MessageExceptionHandler({ | |
ConstraintViolationException.class | |
}) | |
public void handleOnError(ConstraintViolationException exception, | |
@Payload CustomerCreatedEvent event, | |
@Headers Map<String, String> headers) { | |
LOGGER.error( | |
"It was not possible to consume the message with messageId={} (ApproximateReceiveCount ={}): {}", | |
headers.get("MessageId"), | |
headers.get("ApproximateReceiveCount"), | |
event, | |
exception | |
); | |
// TODO: write your error handling logic here... | |
// TODO: also, you can annotate this method with @SendTo("myQueue-DLQ") to forward the message to another queue | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.app2.xxx.samples.aws.sqs; | |
import br.com.zup.edu.app2.xxx.samples.aws.sqs.base.SqsIntegrationTest; | |
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.CustomerRepository; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import com.amazonaws.services.sqs.model.GetQueueAttributesResult; | |
import com.amazonaws.services.sqs.model.PurgeQueueRequest; | |
import io.awspring.cloud.messaging.core.QueueMessagingTemplate; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.DisplayName; | |
import org.junit.jupiter.api.Test; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.test.context.TestPropertySource; | |
import java.time.LocalDateTime; | |
import java.util.UUID; | |
import static java.util.List.of; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.awaitility.Awaitility.await; | |
/** | |
* Here we start the listener on startup, and we guarantee that | |
* it is stopped in the end of all tests by closing the application context (thanks to @DirtiesContext) | |
*/ | |
@TestPropertySource(properties = { | |
"cloud.aws.sqs.listener.auto-startup = true" | |
}) | |
class CustomerCreatedEventSqsConsumerTest extends SqsIntegrationTest { | |
@Autowired | |
private QueueMessagingTemplate sqsTemplate; | |
@Autowired | |
private AmazonSQSAsync SQS; | |
@Value("${samples.aws.sqs.consumer-queue}") | |
private String consumerQueueName; | |
@Autowired | |
private CustomerRepository repository; | |
@BeforeEach | |
public void setUp() { | |
repository.deleteAll(); | |
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName)); | |
} | |
@Test | |
@DisplayName("should consume an event from SQS queue") | |
public void t1() { | |
// scenario | |
CustomerCreatedEvent event = new CustomerCreatedEvent( | |
UUID.randomUUID(), | |
"Rafael Ponte", | |
"+5585988776655", | |
LocalDateTime.now() | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, event); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0); | |
assertThat(repository.findAll()) | |
.hasSize(1) | |
.usingRecursiveFieldByFieldElementComparator() | |
.containsExactly(event.toModel()); | |
}); | |
} | |
@Test | |
@DisplayName("should not consume an event from SQS queue when the event is invalid") | |
public void t2() { | |
// scenario | |
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent( | |
UUID.randomUUID(), null, null, null | |
); | |
sqsTemplate | |
.convertAndSend(consumerQueueName, invalidEvent); | |
// action | |
// ...is async, so it will be performed by our SQS listener | |
// validation | |
await().atMost(3, SECONDS).untilAsserted(() -> { | |
assertThat(repository.count()).isEqualTo(0); | |
assertThat(numberOfMessagesInQueue()).isEqualTo(0); | |
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s | |
}); | |
} | |
private Integer numberOfMessagesInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessages") | |
); | |
} | |
private Integer numberOfMessagesNotVisibleInQueue() { | |
GetQueueAttributesResult attributes = SQS | |
.getQueueAttributes(consumerQueueName, of("All")); | |
return Integer.parseInt( | |
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible") | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.app2.xxx.samples.aws.sqs.base; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.context.annotation.Import; | |
import org.springframework.test.annotation.DirtiesContext; | |
import org.springframework.test.context.ActiveProfiles; | |
import org.springframework.test.context.DynamicPropertyRegistry; | |
import org.springframework.test.context.DynamicPropertySource; | |
import org.testcontainers.containers.localstack.LocalStackContainer; | |
import org.testcontainers.junit.jupiter.Container; | |
import org.testcontainers.junit.jupiter.Testcontainers; | |
import org.testcontainers.utility.DockerImageName; | |
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; | |
/** | |
* Base class responsible for starting Localstack and configuring it into the application | |
* before tests are executed | |
*/ | |
@SpringBootTest | |
@ActiveProfiles("test") | |
@Import(SqsTestConfig.class) | |
@Testcontainers @DirtiesContext | |
public abstract class SqsIntegrationTest { | |
private static DockerImageName LOCALSTACK_IMAGE = DockerImageName.parse("localstack/localstack"); | |
@Container | |
public static LocalStackContainer LOCALSTACK_CONTAINER = new LocalStackContainer(LOCALSTACK_IMAGE) | |
.withServices(SQS); | |
/** | |
* Just configures Localstack's SQS server endpoint in the application | |
*/ | |
@DynamicPropertySource | |
static void registerProperties(DynamicPropertyRegistry registry) { | |
registry.add("cloud.aws.sqs.endpoint", | |
() -> LOCALSTACK_CONTAINER.getEndpointOverride(SQS).toString()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package br.com.zup.edu.app2.xxx.samples.aws.sqs.base; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import io.awspring.cloud.autoconfigure.messaging.SqsProperties; | |
import io.awspring.cloud.core.env.ResourceIdResolver; | |
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer; | |
import io.awspring.cloud.messaging.support.destination.DynamicQueueUrlDestinationResolver; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.config.BeanPostProcessor; | |
import org.springframework.boot.test.context.TestConfiguration; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.messaging.core.CachingDestinationResolverProxy; | |
import org.springframework.messaging.core.DestinationResolver; | |
@TestConfiguration | |
public class SqsTestConfig { | |
@Autowired | |
private SqsProperties sqsProperties; | |
/** | |
* Configures the SimpleMessageListenerContainer to auto-create a SQS Queue in case it does not exist. | |
* | |
* This is necessary because if the queue does not exist during startup the SimpleMessageListenerContainer | |
* stops working with the following warning message: | |
* | |
* > WARN [main] i.a.c.m.l.SimpleMessageListenerContainer: | |
* > Ignoring queue with name 'customersCreatedQueue': The queue does not exist.; | |
* > nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue | |
* > does not exist for this wsdl version. | |
*/ | |
@Bean | |
public BeanPostProcessor simpleMessageListenerContainerPostProcessor(DestinationResolver<String> destinationResolver) { | |
return new BeanPostProcessor() { | |
@Override | |
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { | |
if (bean instanceof SimpleMessageListenerContainer container) { | |
container.setDestinationResolver(destinationResolver); | |
} | |
return bean; | |
} | |
}; | |
} | |
/** | |
* Creates a DynamicQueueUrlDestinationResolver capable of auto-creating | |
* a SQS queue in case it does not exist | |
*/ | |
@Bean | |
public DestinationResolver<String> autoCreateQueueDestinationResolver( | |
AmazonSQSAsync sqs, | |
@Autowired(required = false) ResourceIdResolver resourceIdResolver) { | |
DynamicQueueUrlDestinationResolver autoCreateQueueResolver | |
= new DynamicQueueUrlDestinationResolver(sqs, resourceIdResolver); | |
autoCreateQueueResolver.setAutoCreate(true); | |
return new CachingDestinationResolverProxy<>(autoCreateQueueResolver); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# Spring Cloud AWS | |
# https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/appendix.html | |
## | |
cloud: | |
aws: | |
stack: | |
auto: false | |
enabled: false | |
credentials: | |
access-key: localstackAccessKeyId | |
secret-key: localstackSecretAccessKey | |
region: | |
static: sa-east-1 | |
sqs: | |
endpoint: http://localhost:4566 | |
listener: | |
auto-startup: true | |
fail-on-missing-queue: false | |
default-deletion-policy: NO_REDRIVE | |
max-number-of-messages: 10 | |
visibility-timeout: 30 # 30s | |
wait-timeout: 20 # 20s | |
back-off-time: 10000 # 10s | |
logging: | |
level: | |
io.awspring.cloud.messaging.core: info | |
io.awspring.cloud.messaging.listener: info | |
com.amazonaws.services.sqs: debug | |
# Samples configuration | |
samples.aws.sqs.consumer-queue: customersCreatedQueue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# Spring Cloud AWS | |
## | |
cloud: | |
aws: | |
sqs: | |
listener: | |
auto-startup: false | |
queue-stop-timeout: 500 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It's a very cool example Thank you.