Last active
October 13, 2025 01:03
-
-
Save robsonkades/f1d35862985b1e5857692884853b2df7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @RestController | |
| @RequestMapping | |
| public class MessageController { | |
| private final AzureServiceBusTemplate serviceBusTemplate; | |
| public MessageController(AzureServiceBusTemplate azureServiceBusTemplate) { | |
| this.serviceBusTemplate = azureServiceBusTemplate; | |
| } | |
| @PostMapping | |
| public void postMessage() { | |
| List<Message<ConsumerService.User>> messages = new ArrayList<>(); | |
| for (int i = 0; i < 1000; i++) { | |
| Message<ConsumerService.User> payload = MessageBuilder.withPayload(new ConsumerService.User("Joao-" + i)).build(); | |
| messages.add(payload); | |
| } | |
| serviceBusTemplate.sendBatchAsync("task", messages).subscribe(); | |
| } | |
| } | |
| import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext; | |
| import com.azure.spring.messaging.servicebus.implementation.core.annotation.ServiceBusListener; | |
| import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.messaging.MessageHeaders; | |
| import org.springframework.stereotype.Service; | |
| @Service | |
| public class ConsumerService { | |
| private final Logger logger = LoggerFactory.getLogger(ConsumerService.class); | |
| @ServiceBusListener(destination = "task", concurrency = "10") | |
| public void handleMessageFromServiceBusQueue(String payload, MessageHeaders headers) { | |
| ServiceBusReceivedMessageContext ctx = headers.get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, ServiceBusReceivedMessageContext.class); | |
| // serviceBusReceivedMessageContext.abandon(); | |
| logger.info("Consume task message: {}", payload); | |
| } | |
| @ServiceBusListener(destination = "mytopic", group = "app") | |
| public void handleMessageFromServiceBusTopic(String payload, MessageHeaders headers) { | |
| ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = headers.get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, ServiceBusReceivedMessageContext.class); | |
| if (serviceBusReceivedMessageContext == null) { | |
| return; | |
| } | |
| serviceBusReceivedMessageContext.complete(); | |
| logger.info("Consume mytopic message: {}", payload); | |
| } | |
| public static class User { | |
| private String name; | |
| public User() { | |
| } | |
| public User(String name) { | |
| this.name = name; | |
| } | |
| public String getName() { | |
| return name; | |
| } | |
| public void setName(String name) { | |
| this.name = name; | |
| } | |
| } | |
| } | |
| @SpringBootApplication | |
| @EnableAzureMessaging | |
| public class AzureServicebusTemplateApplication { | |
| public static void main(String[] args) { | |
| SpringApplication.run(AzureServicebusTemplateApplication.class, args); | |
| } | |
| } | |
| import com.azure.messaging.servicebus.ServiceBusException; | |
| import com.azure.messaging.servicebus.ServiceBusMessage; | |
| import com.azure.messaging.servicebus.ServiceBusMessageBatch; | |
| import com.azure.messaging.servicebus.ServiceBusReceivedMessage; | |
| import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient; | |
| import com.azure.spring.messaging.converter.AzureMessageConverter; | |
| import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; | |
| import com.azure.spring.messaging.servicebus.implementation.support.converter.ServiceBusMessageConverter; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import org.springframework.beans.factory.DisposableBean; | |
| import org.springframework.messaging.Message; | |
| import org.springframework.stereotype.Component; | |
| import org.springframework.util.Assert; | |
| import reactor.core.publisher.Flux; | |
| import reactor.core.publisher.Mono; | |
| import reactor.core.scheduler.Schedulers; | |
| import reactor.util.retry.Retry; | |
| import java.time.Duration; | |
| import java.util.ArrayList; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.concurrent.ConcurrentHashMap; | |
| import java.util.concurrent.ConcurrentLinkedQueue; | |
| @Component | |
| public class AzureServiceBusTemplate implements DisposableBean { | |
| private static final Logger logger = LoggerFactory.getLogger(AzureServiceBusTemplate.class); | |
| private static final ServiceBusMessageConverter DEFAULT_CONVERTER = new ServiceBusMessageConverter(); | |
| private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(2); | |
| private static final int MAX_RETRY_ATTEMPTS = 3; | |
| private static final Duration RETRY_BACKOFF = Duration.ofSeconds(2); | |
| private final ServiceBusProducerFactory producerFactory; | |
| private final Map<String, ServiceBusSenderAsyncClient> senderCache = new ConcurrentHashMap<>(); | |
| private AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> messageConverter = DEFAULT_CONVERTER; | |
| public AzureServiceBusTemplate(ServiceBusProducerFactory producerFactory) { | |
| this.producerFactory = producerFactory; | |
| } | |
| /** | |
| * Sends messages in batches asynchronously with proper error handling and retry logic | |
| * | |
| * @param destination the Service Bus destination | |
| * @param messages list of messages to send | |
| * @param <U> the message payload type | |
| * @return Mono<Void> that completes when all messages are sent | |
| */ | |
| public <U> Mono<Void> sendBatchAsync(String destination, List<Message<U>> messages) { | |
| return sendBatchAsync(destination, messages, DEFAULT_TIMEOUT); | |
| } | |
| /** | |
| * Send message asynchronously with proper error handling and retry logic | |
| * | |
| * @param destination the Service Bus destination | |
| * @param message message to send | |
| * @param <U> the message payload type | |
| * @return Mono<Void> that completes when all messages are sent | |
| */ | |
| public <U> Mono<Void> sendAsync(String destination, Message<U> message) { | |
| return sendAsync(destination, message, DEFAULT_TIMEOUT); | |
| } | |
| public <U> Mono<Void> sendAsync(String destination, Message<U> message, Duration timeout) { | |
| return Mono.fromSupplier(() -> getSender(destination)) | |
| .flatMap(sender -> sender.sendMessage(convertMessage(message))) | |
| .timeout(timeout) | |
| .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, RETRY_BACKOFF) | |
| .filter(this::isRetryableException) | |
| .doBeforeRetry(retrySignal -> | |
| logger.warn("Retrying send operation. Attempt: {}, Error: {}", | |
| retrySignal.totalRetries() + 1, retrySignal.failure().getMessage()))) | |
| .doOnSuccess(v -> logger.info("Successfully sent message to destination: {}", destination)) | |
| .doOnError(error -> logger.error("Failed to send message to destination: {}", destination, error)) | |
| .subscribeOn(Schedulers.boundedElastic()); | |
| } | |
| /** | |
| * Sends messages in batches asynchronously with configurable timeout | |
| * | |
| * @param destination the Service Bus destination | |
| * @param messages list of messages to send | |
| * @param timeout operation timeout | |
| * @param <U> the message payload type | |
| * @return Mono<Void> that completes when all messages are sent | |
| */ | |
| public <U> Mono<Void> sendBatchAsync(String destination, List<Message<U>> messages, Duration timeout) { | |
| Assert.hasText(destination, "Destination cannot be null or empty"); | |
| Assert.notNull(messages, "Messages cannot be null"); | |
| Assert.notNull(timeout, "Timeout cannot be null"); | |
| if (messages.isEmpty()) { | |
| logger.debug("No messages to send to destination: {}", destination); | |
| return Mono.empty(); | |
| } | |
| logger.info("Starting batch send operation for {} messages to destination: {}", messages.size(), destination); | |
| return Mono.fromSupplier(() -> getSender(destination)) | |
| .flatMap(sender -> processMessagesInBatches(sender, messages, destination)) | |
| .timeout(timeout) | |
| .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, RETRY_BACKOFF) | |
| .filter(this::isRetryableException)) | |
| .doOnSuccess(v -> logger.info("Successfully sent all messages to destination: {}", destination)) | |
| .doOnError(error -> logger.error("Failed to send messages to destination: {}", destination, error)) | |
| .subscribeOn(Schedulers.boundedElastic()); | |
| } | |
| private ServiceBusSenderAsyncClient getSender(String destination) { | |
| return senderCache.computeIfAbsent(destination, producerFactory::createProducer); | |
| } | |
| private <U> Mono<Void> processMessagesInBatches(ServiceBusSenderAsyncClient sender, List<Message<U>> messages, String destination) { | |
| return Flux.fromIterable(messages) | |
| .map(this::convertMessage) | |
| .collectList() | |
| .flatMap(serviceBusMessages -> createAndSendBatches(sender, serviceBusMessages, destination)); | |
| } | |
| private <U> ServiceBusMessage convertMessage(Message<U> message) { | |
| try { | |
| return messageConverter.fromMessage(message, ServiceBusMessage.class); | |
| } catch (Exception e) { | |
| logger.error("Failed to convert message: {}", message, e); | |
| throw new ServiceBusTemplateException("Message conversion failed", e); | |
| } | |
| } | |
| private Mono<Void> createAndSendBatches(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages, String destination) { | |
| return buildBatches(sender, messages) | |
| .flatMap(batches -> sendAllBatches(sender, new ConcurrentLinkedQueue<>(batches), destination)); | |
| } | |
| private Mono<List<ServiceBusMessageBatch>> buildBatches(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages) { | |
| return sender.createMessageBatch() | |
| .timeout(Duration.ofSeconds(5)) | |
| .flatMap(initialBatch -> buildBatchesRecursive(sender, messages, 0, initialBatch, new ArrayList<>())); | |
| } | |
| private Mono<List<ServiceBusMessageBatch>> buildBatchesRecursive(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages, int index, ServiceBusMessageBatch currentBatch, List<ServiceBusMessageBatch> acc) { | |
| if (index >= messages.size()) { | |
| if (currentBatch.getCount() > 0) { | |
| acc.add(currentBatch); | |
| } | |
| return Mono.just(acc); | |
| } | |
| ServiceBusMessage message = messages.get(index); | |
| try { | |
| if (currentBatch.tryAddMessage(message)) { | |
| return buildBatchesRecursive(sender, messages, index + 1, currentBatch, acc); | |
| } else { | |
| acc.add(currentBatch); | |
| return sender.createMessageBatch() | |
| .flatMap(newBatch -> { | |
| if (!newBatch.tryAddMessage(message)) { | |
| return Mono.error(new ServiceBusTemplateException("Message is too large to fit in an empty batch")); | |
| } | |
| return buildBatchesRecursive(sender, messages, index + 1, newBatch, acc); | |
| }); | |
| } | |
| } catch (Exception e) { | |
| // Ensure current batch is not leaked on error | |
| acc.forEach(this::closeBatchSafely); | |
| if (currentBatch != null && currentBatch.getCount() > 0) { | |
| closeBatchSafely(currentBatch); | |
| } | |
| return Mono.error(new ServiceBusTemplateException("Failed to build message batches", e)); | |
| } | |
| } | |
| private Mono<Void> sendAllBatches(ServiceBusSenderAsyncClient sender, ConcurrentLinkedQueue<ServiceBusMessageBatch> batches, String destination) { | |
| if (batches.isEmpty()) { | |
| return Mono.empty(); | |
| } | |
| logger.debug("Sending {} batches to destination: {}", batches.size(), destination); | |
| return Flux.fromIterable(batches) | |
| .flatMap(batch -> | |
| sender.sendMessages(batch) | |
| .doOnSuccess(v -> logger.debug("Successfully sent batch with {} messages", batch.getCount())) | |
| .doOnError(error -> logger.error("Failed to send batch with {} messages", batch.getCount(), error)) | |
| .doFinally(signalType -> closeBatchSafely(batch)) | |
| ) | |
| .then() | |
| .doOnSuccess(v -> logger.debug("All {} batches sent successfully to destination: {}", batches.size(), destination)); | |
| } | |
| private void closeBatchSafely(ServiceBusMessageBatch batch) { | |
| try { | |
| if (batch != null) { | |
| // ServiceBusMessageBatch doesn't have explicit close method, | |
| // but we can clear references | |
| logger.trace("Batch cleanup completed"); | |
| } | |
| } catch (Exception e) { | |
| logger.debug("Error during batch cleanup", e); | |
| } | |
| } | |
| protected boolean isRetryableException(Throwable throwable) { | |
| if (throwable instanceof ServiceBusException sbException) { | |
| // Retry on transient errors like server busy, timeout, etc. | |
| return sbException.isTransient(); | |
| } | |
| // Retry on general network/timeout issues | |
| return throwable instanceof java.util.concurrent.TimeoutException || | |
| throwable instanceof java.io.IOException; | |
| } | |
| /** | |
| * Sets a custom message converter | |
| * | |
| * @param messageConverter the custom converter | |
| */ | |
| public void setMessageConverter(AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> messageConverter) { | |
| Assert.notNull(messageConverter, "Message converter cannot be null"); | |
| this.messageConverter = messageConverter; | |
| } | |
| /** | |
| * Custom exception for Service Bus template operations | |
| */ | |
| public static class ServiceBusTemplateException extends RuntimeException { | |
| public ServiceBusTemplateException(String message) { | |
| super(message); | |
| } | |
| public ServiceBusTemplateException(String message, Throwable cause) { | |
| super(message, cause); | |
| } | |
| } | |
| @Override | |
| public void destroy() { | |
| senderCache.values().forEach(ServiceBusSenderAsyncClient::close); | |
| } | |
| } | |
| spring.cloud.azure.servicebus.connection-string= | |
| spring.cloud.azure.servicebus.consumer.auto-complete=false | |
| spring.cloud.azure.servicebus.consumer.receive-mode=peek_lock | |
| spring.cloud.azure.servicebus.entity-type=queue | |
| spring.threads.virtual.enabled=true | |
| spring.cloud.azure.servicebus.consumer.prefetch-count=250 | |
| import com.azure.messaging.servicebus.ServiceBusException; | |
| import com.azure.messaging.servicebus.ServiceBusMessage; | |
| import com.azure.messaging.servicebus.ServiceBusMessageBatch; | |
| import com.azure.messaging.servicebus.ServiceBusReceivedMessage; | |
| import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient; | |
| import com.azure.spring.messaging.converter.AzureMessageConverter; | |
| import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; | |
| import org.junit.jupiter.api.Assertions; | |
| import org.junit.jupiter.api.BeforeEach; | |
| import org.junit.jupiter.api.DisplayName; | |
| import org.junit.jupiter.api.Test; | |
| import org.junit.jupiter.api.extension.ExtendWith; | |
| import org.mockito.Mock; | |
| import org.mockito.junit.jupiter.MockitoExtension; | |
| import org.mockito.junit.jupiter.MockitoSettings; | |
| import org.mockito.quality.Strictness; | |
| import org.springframework.messaging.Message; | |
| import org.springframework.messaging.support.MessageBuilder; | |
| import reactor.core.publisher.Mono; | |
| import reactor.test.StepVerifier; | |
| import java.io.IOException; | |
| import java.util.List; | |
| import java.util.concurrent.TimeoutException; | |
| import static org.mockito.Mockito.any; | |
| import static org.mockito.Mockito.anyString; | |
| import static org.mockito.Mockito.atLeast; | |
| import static org.mockito.Mockito.atLeastOnce; | |
| import static org.mockito.Mockito.eq; | |
| import static org.mockito.Mockito.mock; | |
| import static org.mockito.Mockito.times; | |
| import static org.mockito.Mockito.verify; | |
| import static org.mockito.Mockito.when; | |
| @ExtendWith(MockitoExtension.class) | |
| @MockitoSettings(strictness = Strictness.LENIENT) | |
| @DisplayName("AzureServiceBusTemplate Unit Tests") | |
| class AzureServiceBusTemplateTest { | |
| @Mock | |
| private ServiceBusProducerFactory producerFactory; | |
| @Mock | |
| private ServiceBusSenderAsyncClient senderClient; | |
| @Mock | |
| private ServiceBusMessageBatch messageBatch; | |
| @Mock | |
| private AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> converter; | |
| private AzureServiceBusTemplate template; | |
| @BeforeEach | |
| void setup() { | |
| when(producerFactory.createProducer(anyString())).thenReturn(senderClient); | |
| template = new AzureServiceBusTemplate(producerFactory); | |
| template.setMessageConverter(converter); | |
| } | |
| @Test | |
| @DisplayName("Deve enviar mensagem simples com sucesso") | |
| void sendAsync_shouldSendSuccessfully() { | |
| // Arrange | |
| Message<String> msg = MessageBuilder.withPayload("data").build(); | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class)); | |
| when(senderClient.sendMessage(any())).thenReturn(Mono.empty()); | |
| // Act | |
| StepVerifier.create(template.sendAsync("queue", msg)) | |
| .verifyComplete(); | |
| // Assert | |
| verify(senderClient).sendMessage(any()); | |
| } | |
| @Test | |
| @DisplayName("Deve repetir envio em caso de erro transitório") | |
| void sendAsync_shouldRetryOnTransientError() { | |
| // Arrange | |
| Message<String> msg = MessageBuilder.withPayload("retry").build(); | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class)); | |
| ServiceBusException transientError = mock(ServiceBusException.class); | |
| when(transientError.isTransient()).thenReturn(true); | |
| when(senderClient.sendMessage(any())) | |
| .thenReturn(Mono.error(transientError)) | |
| .thenReturn(Mono.empty()); | |
| // Act | |
| StepVerifier.create(template.sendAsync("q", msg)) | |
| .verifyComplete(); | |
| // Assert | |
| verify(senderClient, atLeast(2)).sendMessage(any()); | |
| } | |
| @Test | |
| @DisplayName("Deve falhar quando ocorrer erro de conversão de mensagem") | |
| void sendAsync_shouldFailOnConversionError() { | |
| // Arrange | |
| Message<String> msg = MessageBuilder.withPayload("bad").build(); | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))) | |
| .thenThrow(new RuntimeException("conversion failed")); | |
| // Act & Assert | |
| StepVerifier.create(template.sendAsync("q", msg)) | |
| .expectError(AzureServiceBusTemplate.ServiceBusTemplateException.class) | |
| .verify(); | |
| } | |
| @Test | |
| @DisplayName("Deve completar imediatamente quando a lista de mensagens estiver vazia") | |
| void sendBatchAsync_shouldHandleEmptyList() { | |
| // Arrange / Act / Assert | |
| StepVerifier.create(template.sendBatchAsync("queue", List.of())) | |
| .verifyComplete(); | |
| } | |
| @Test | |
| @DisplayName("Deve enviar múltiplas mensagens em batches com sucesso") | |
| void sendBatchAsync_shouldSendMultipleBatches() { | |
| // Arrange | |
| Message<String> m1 = MessageBuilder.withPayload("a").build(); | |
| Message<String> m2 = MessageBuilder.withPayload("b").build(); | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))) | |
| .thenReturn(mock(ServiceBusMessage.class)); | |
| when(senderClient.createMessageBatch()).thenReturn(Mono.just(messageBatch)); | |
| when(messageBatch.tryAddMessage(any())).thenReturn(true); | |
| when(messageBatch.getCount()).thenReturn(2); | |
| when(senderClient.sendMessages(any(ServiceBusMessageBatch.class))).thenReturn(Mono.empty()); | |
| // Act | |
| template.sendBatchAsync("topic", List.of(m1, m2)).block(); | |
| // Assert | |
| verify(senderClient, atLeastOnce()).sendMessages(any(ServiceBusMessageBatch.class)); | |
| verify(senderClient, atLeastOnce()).createMessageBatch(); | |
| } | |
| @Test | |
| @DisplayName("Deve lançar exceção se uma mensagem for muito grande para o batch") | |
| void sendBatchAsync_shouldErrorIfMessageTooLarge() { | |
| // Arrange | |
| Message<String> msg = MessageBuilder.withPayload("too big").build(); | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class)); | |
| when(senderClient.createMessageBatch()).thenReturn(Mono.just(messageBatch)); | |
| when(messageBatch.tryAddMessage(any())).thenReturn(false); | |
| // Act & Assert | |
| StepVerifier.create(template.sendBatchAsync("q", List.of(msg))) | |
| .expectError(AzureServiceBusTemplate.ServiceBusTemplateException.class) | |
| .verify(); | |
| } | |
| @Test | |
| @DisplayName("isRetryableException deve retornar true para erros transitórios") | |
| void isRetryableException_shouldReturnTrueForTransient() { | |
| // Arrange | |
| ServiceBusException ex = mock(ServiceBusException.class); | |
| when(ex.isTransient()).thenReturn(true); | |
| // Act & Assert | |
| Assertions.assertTrue(template.isRetryableException(ex)); | |
| } | |
| @Test | |
| @DisplayName("isRetryableException deve retornar false para erros não transitórios") | |
| void isRetryableException_shouldReturnFalseForNonTransient() { | |
| // Arrange | |
| ServiceBusException ex = mock(ServiceBusException.class); | |
| when(ex.isTransient()).thenReturn(false); | |
| // Act & Assert | |
| Assertions.assertFalse(template.isRetryableException(ex)); | |
| } | |
| @Test | |
| @DisplayName("isRetryableException deve retornar true para IOException e TimeoutException") | |
| void isRetryableException_shouldReturnTrueForIOAndTimeout() { | |
| // Arrange / Act / Assert | |
| Assertions.assertTrue(template.isRetryableException(new IOException())); | |
| Assertions.assertTrue(template.isRetryableException(new TimeoutException())); | |
| } | |
| @Test | |
| @DisplayName("setMessageConverter deve lançar exceção se o conversor for nulo") | |
| void setMessageConverter_shouldRejectNull() { | |
| // Arrange / Act / Assert | |
| Assertions.assertThrows(IllegalArgumentException.class, | |
| () -> template.setMessageConverter(null)); | |
| } | |
| @Test | |
| @DisplayName("destroy deve fechar todos os clientes de envio armazenados em cache") | |
| void destroy_shouldCloseAllSenders() { | |
| // Arrange | |
| when(converter.fromMessage(any(), eq(ServiceBusMessage.class))) | |
| .thenReturn(mock(ServiceBusMessage.class)); | |
| when(senderClient.sendMessage(any())).thenReturn(Mono.empty()); | |
| when(producerFactory.createProducer(anyString())).thenReturn(senderClient); | |
| // Act | |
| template.sendAsync("queue", MessageBuilder.withPayload("x").build()).block(); | |
| template.destroy(); | |
| // Assert | |
| verify(senderClient, times(1)).close(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment