Skip to content

Instantly share code, notes, and snippets.

@robsonkades
Last active October 13, 2025 01:03
Show Gist options
  • Save robsonkades/f1d35862985b1e5857692884853b2df7 to your computer and use it in GitHub Desktop.
Save robsonkades/f1d35862985b1e5857692884853b2df7 to your computer and use it in GitHub Desktop.
@RestController
@RequestMapping
public class MessageController {
private final AzureServiceBusTemplate serviceBusTemplate;
public MessageController(AzureServiceBusTemplate azureServiceBusTemplate) {
this.serviceBusTemplate = azureServiceBusTemplate;
}
@PostMapping
public void postMessage() {
List<Message<ConsumerService.User>> messages = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
Message<ConsumerService.User> payload = MessageBuilder.withPayload(new ConsumerService.User("Joao-" + i)).build();
messages.add(payload);
}
serviceBusTemplate.sendBatchAsync("task", messages).subscribe();
}
}
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.messaging.servicebus.implementation.core.annotation.ServiceBusListener;
import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
private final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
@ServiceBusListener(destination = "task", concurrency = "10")
public void handleMessageFromServiceBusQueue(String payload, MessageHeaders headers) {
ServiceBusReceivedMessageContext ctx = headers.get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, ServiceBusReceivedMessageContext.class);
// serviceBusReceivedMessageContext.abandon();
logger.info("Consume task message: {}", payload);
}
@ServiceBusListener(destination = "mytopic", group = "app")
public void handleMessageFromServiceBusTopic(String payload, MessageHeaders headers) {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = headers.get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, ServiceBusReceivedMessageContext.class);
if (serviceBusReceivedMessageContext == null) {
return;
}
serviceBusReceivedMessageContext.complete();
logger.info("Consume mytopic message: {}", payload);
}
public static class User {
private String name;
public User() {
}
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
@SpringBootApplication
@EnableAzureMessaging
public class AzureServicebusTemplateApplication {
public static void main(String[] args) {
SpringApplication.run(AzureServicebusTemplateApplication.class, args);
}
}
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
import com.azure.spring.messaging.servicebus.implementation.support.converter.ServiceBusMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class AzureServiceBusTemplate implements DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(AzureServiceBusTemplate.class);
private static final ServiceBusMessageConverter DEFAULT_CONVERTER = new ServiceBusMessageConverter();
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(2);
private static final int MAX_RETRY_ATTEMPTS = 3;
private static final Duration RETRY_BACKOFF = Duration.ofSeconds(2);
private final ServiceBusProducerFactory producerFactory;
private final Map<String, ServiceBusSenderAsyncClient> senderCache = new ConcurrentHashMap<>();
private AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> messageConverter = DEFAULT_CONVERTER;
public AzureServiceBusTemplate(ServiceBusProducerFactory producerFactory) {
this.producerFactory = producerFactory;
}
/**
* Sends messages in batches asynchronously with proper error handling and retry logic
*
* @param destination the Service Bus destination
* @param messages list of messages to send
* @param <U> the message payload type
* @return Mono<Void> that completes when all messages are sent
*/
public <U> Mono<Void> sendBatchAsync(String destination, List<Message<U>> messages) {
return sendBatchAsync(destination, messages, DEFAULT_TIMEOUT);
}
/**
* Send message asynchronously with proper error handling and retry logic
*
* @param destination the Service Bus destination
* @param message message to send
* @param <U> the message payload type
* @return Mono<Void> that completes when all messages are sent
*/
public <U> Mono<Void> sendAsync(String destination, Message<U> message) {
return sendAsync(destination, message, DEFAULT_TIMEOUT);
}
public <U> Mono<Void> sendAsync(String destination, Message<U> message, Duration timeout) {
return Mono.fromSupplier(() -> getSender(destination))
.flatMap(sender -> sender.sendMessage(convertMessage(message)))
.timeout(timeout)
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, RETRY_BACKOFF)
.filter(this::isRetryableException)
.doBeforeRetry(retrySignal ->
logger.warn("Retrying send operation. Attempt: {}, Error: {}",
retrySignal.totalRetries() + 1, retrySignal.failure().getMessage())))
.doOnSuccess(v -> logger.info("Successfully sent message to destination: {}", destination))
.doOnError(error -> logger.error("Failed to send message to destination: {}", destination, error))
.subscribeOn(Schedulers.boundedElastic());
}
/**
* Sends messages in batches asynchronously with configurable timeout
*
* @param destination the Service Bus destination
* @param messages list of messages to send
* @param timeout operation timeout
* @param <U> the message payload type
* @return Mono<Void> that completes when all messages are sent
*/
public <U> Mono<Void> sendBatchAsync(String destination, List<Message<U>> messages, Duration timeout) {
Assert.hasText(destination, "Destination cannot be null or empty");
Assert.notNull(messages, "Messages cannot be null");
Assert.notNull(timeout, "Timeout cannot be null");
if (messages.isEmpty()) {
logger.debug("No messages to send to destination: {}", destination);
return Mono.empty();
}
logger.info("Starting batch send operation for {} messages to destination: {}", messages.size(), destination);
return Mono.fromSupplier(() -> getSender(destination))
.flatMap(sender -> processMessagesInBatches(sender, messages, destination))
.timeout(timeout)
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, RETRY_BACKOFF)
.filter(this::isRetryableException))
.doOnSuccess(v -> logger.info("Successfully sent all messages to destination: {}", destination))
.doOnError(error -> logger.error("Failed to send messages to destination: {}", destination, error))
.subscribeOn(Schedulers.boundedElastic());
}
private ServiceBusSenderAsyncClient getSender(String destination) {
return senderCache.computeIfAbsent(destination, producerFactory::createProducer);
}
private <U> Mono<Void> processMessagesInBatches(ServiceBusSenderAsyncClient sender, List<Message<U>> messages, String destination) {
return Flux.fromIterable(messages)
.map(this::convertMessage)
.collectList()
.flatMap(serviceBusMessages -> createAndSendBatches(sender, serviceBusMessages, destination));
}
private <U> ServiceBusMessage convertMessage(Message<U> message) {
try {
return messageConverter.fromMessage(message, ServiceBusMessage.class);
} catch (Exception e) {
logger.error("Failed to convert message: {}", message, e);
throw new ServiceBusTemplateException("Message conversion failed", e);
}
}
private Mono<Void> createAndSendBatches(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages, String destination) {
return buildBatches(sender, messages)
.flatMap(batches -> sendAllBatches(sender, new ConcurrentLinkedQueue<>(batches), destination));
}
private Mono<List<ServiceBusMessageBatch>> buildBatches(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages) {
return sender.createMessageBatch()
.timeout(Duration.ofSeconds(5))
.flatMap(initialBatch -> buildBatchesRecursive(sender, messages, 0, initialBatch, new ArrayList<>()));
}
private Mono<List<ServiceBusMessageBatch>> buildBatchesRecursive(ServiceBusSenderAsyncClient sender, List<ServiceBusMessage> messages, int index, ServiceBusMessageBatch currentBatch, List<ServiceBusMessageBatch> acc) {
if (index >= messages.size()) {
if (currentBatch.getCount() > 0) {
acc.add(currentBatch);
}
return Mono.just(acc);
}
ServiceBusMessage message = messages.get(index);
try {
if (currentBatch.tryAddMessage(message)) {
return buildBatchesRecursive(sender, messages, index + 1, currentBatch, acc);
} else {
acc.add(currentBatch);
return sender.createMessageBatch()
.flatMap(newBatch -> {
if (!newBatch.tryAddMessage(message)) {
return Mono.error(new ServiceBusTemplateException("Message is too large to fit in an empty batch"));
}
return buildBatchesRecursive(sender, messages, index + 1, newBatch, acc);
});
}
} catch (Exception e) {
// Ensure current batch is not leaked on error
acc.forEach(this::closeBatchSafely);
if (currentBatch != null && currentBatch.getCount() > 0) {
closeBatchSafely(currentBatch);
}
return Mono.error(new ServiceBusTemplateException("Failed to build message batches", e));
}
}
private Mono<Void> sendAllBatches(ServiceBusSenderAsyncClient sender, ConcurrentLinkedQueue<ServiceBusMessageBatch> batches, String destination) {
if (batches.isEmpty()) {
return Mono.empty();
}
logger.debug("Sending {} batches to destination: {}", batches.size(), destination);
return Flux.fromIterable(batches)
.flatMap(batch ->
sender.sendMessages(batch)
.doOnSuccess(v -> logger.debug("Successfully sent batch with {} messages", batch.getCount()))
.doOnError(error -> logger.error("Failed to send batch with {} messages", batch.getCount(), error))
.doFinally(signalType -> closeBatchSafely(batch))
)
.then()
.doOnSuccess(v -> logger.debug("All {} batches sent successfully to destination: {}", batches.size(), destination));
}
private void closeBatchSafely(ServiceBusMessageBatch batch) {
try {
if (batch != null) {
// ServiceBusMessageBatch doesn't have explicit close method,
// but we can clear references
logger.trace("Batch cleanup completed");
}
} catch (Exception e) {
logger.debug("Error during batch cleanup", e);
}
}
protected boolean isRetryableException(Throwable throwable) {
if (throwable instanceof ServiceBusException sbException) {
// Retry on transient errors like server busy, timeout, etc.
return sbException.isTransient();
}
// Retry on general network/timeout issues
return throwable instanceof java.util.concurrent.TimeoutException ||
throwable instanceof java.io.IOException;
}
/**
* Sets a custom message converter
*
* @param messageConverter the custom converter
*/
public void setMessageConverter(AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> messageConverter) {
Assert.notNull(messageConverter, "Message converter cannot be null");
this.messageConverter = messageConverter;
}
/**
* Custom exception for Service Bus template operations
*/
public static class ServiceBusTemplateException extends RuntimeException {
public ServiceBusTemplateException(String message) {
super(message);
}
public ServiceBusTemplateException(String message, Throwable cause) {
super(message, cause);
}
}
@Override
public void destroy() {
senderCache.values().forEach(ServiceBusSenderAsyncClient::close);
}
}
spring.cloud.azure.servicebus.connection-string=
spring.cloud.azure.servicebus.consumer.auto-complete=false
spring.cloud.azure.servicebus.consumer.receive-mode=peek_lock
spring.cloud.azure.servicebus.entity-type=queue
spring.threads.virtual.enabled=true
spring.cloud.azure.servicebus.consumer.prefetch-count=250
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusMessageBatch;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@DisplayName("AzureServiceBusTemplate Unit Tests")
class AzureServiceBusTemplateTest {
@Mock
private ServiceBusProducerFactory producerFactory;
@Mock
private ServiceBusSenderAsyncClient senderClient;
@Mock
private ServiceBusMessageBatch messageBatch;
@Mock
private AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> converter;
private AzureServiceBusTemplate template;
@BeforeEach
void setup() {
when(producerFactory.createProducer(anyString())).thenReturn(senderClient);
template = new AzureServiceBusTemplate(producerFactory);
template.setMessageConverter(converter);
}
@Test
@DisplayName("Deve enviar mensagem simples com sucesso")
void sendAsync_shouldSendSuccessfully() {
// Arrange
Message<String> msg = MessageBuilder.withPayload("data").build();
when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class));
when(senderClient.sendMessage(any())).thenReturn(Mono.empty());
// Act
StepVerifier.create(template.sendAsync("queue", msg))
.verifyComplete();
// Assert
verify(senderClient).sendMessage(any());
}
@Test
@DisplayName("Deve repetir envio em caso de erro transitório")
void sendAsync_shouldRetryOnTransientError() {
// Arrange
Message<String> msg = MessageBuilder.withPayload("retry").build();
when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class));
ServiceBusException transientError = mock(ServiceBusException.class);
when(transientError.isTransient()).thenReturn(true);
when(senderClient.sendMessage(any()))
.thenReturn(Mono.error(transientError))
.thenReturn(Mono.empty());
// Act
StepVerifier.create(template.sendAsync("q", msg))
.verifyComplete();
// Assert
verify(senderClient, atLeast(2)).sendMessage(any());
}
@Test
@DisplayName("Deve falhar quando ocorrer erro de conversão de mensagem")
void sendAsync_shouldFailOnConversionError() {
// Arrange
Message<String> msg = MessageBuilder.withPayload("bad").build();
when(converter.fromMessage(any(), eq(ServiceBusMessage.class)))
.thenThrow(new RuntimeException("conversion failed"));
// Act & Assert
StepVerifier.create(template.sendAsync("q", msg))
.expectError(AzureServiceBusTemplate.ServiceBusTemplateException.class)
.verify();
}
@Test
@DisplayName("Deve completar imediatamente quando a lista de mensagens estiver vazia")
void sendBatchAsync_shouldHandleEmptyList() {
// Arrange / Act / Assert
StepVerifier.create(template.sendBatchAsync("queue", List.of()))
.verifyComplete();
}
@Test
@DisplayName("Deve enviar múltiplas mensagens em batches com sucesso")
void sendBatchAsync_shouldSendMultipleBatches() {
// Arrange
Message<String> m1 = MessageBuilder.withPayload("a").build();
Message<String> m2 = MessageBuilder.withPayload("b").build();
when(converter.fromMessage(any(), eq(ServiceBusMessage.class)))
.thenReturn(mock(ServiceBusMessage.class));
when(senderClient.createMessageBatch()).thenReturn(Mono.just(messageBatch));
when(messageBatch.tryAddMessage(any())).thenReturn(true);
when(messageBatch.getCount()).thenReturn(2);
when(senderClient.sendMessages(any(ServiceBusMessageBatch.class))).thenReturn(Mono.empty());
// Act
template.sendBatchAsync("topic", List.of(m1, m2)).block();
// Assert
verify(senderClient, atLeastOnce()).sendMessages(any(ServiceBusMessageBatch.class));
verify(senderClient, atLeastOnce()).createMessageBatch();
}
@Test
@DisplayName("Deve lançar exceção se uma mensagem for muito grande para o batch")
void sendBatchAsync_shouldErrorIfMessageTooLarge() {
// Arrange
Message<String> msg = MessageBuilder.withPayload("too big").build();
when(converter.fromMessage(any(), eq(ServiceBusMessage.class))).thenReturn(mock(ServiceBusMessage.class));
when(senderClient.createMessageBatch()).thenReturn(Mono.just(messageBatch));
when(messageBatch.tryAddMessage(any())).thenReturn(false);
// Act & Assert
StepVerifier.create(template.sendBatchAsync("q", List.of(msg)))
.expectError(AzureServiceBusTemplate.ServiceBusTemplateException.class)
.verify();
}
@Test
@DisplayName("isRetryableException deve retornar true para erros transitórios")
void isRetryableException_shouldReturnTrueForTransient() {
// Arrange
ServiceBusException ex = mock(ServiceBusException.class);
when(ex.isTransient()).thenReturn(true);
// Act & Assert
Assertions.assertTrue(template.isRetryableException(ex));
}
@Test
@DisplayName("isRetryableException deve retornar false para erros não transitórios")
void isRetryableException_shouldReturnFalseForNonTransient() {
// Arrange
ServiceBusException ex = mock(ServiceBusException.class);
when(ex.isTransient()).thenReturn(false);
// Act & Assert
Assertions.assertFalse(template.isRetryableException(ex));
}
@Test
@DisplayName("isRetryableException deve retornar true para IOException e TimeoutException")
void isRetryableException_shouldReturnTrueForIOAndTimeout() {
// Arrange / Act / Assert
Assertions.assertTrue(template.isRetryableException(new IOException()));
Assertions.assertTrue(template.isRetryableException(new TimeoutException()));
}
@Test
@DisplayName("setMessageConverter deve lançar exceção se o conversor for nulo")
void setMessageConverter_shouldRejectNull() {
// Arrange / Act / Assert
Assertions.assertThrows(IllegalArgumentException.class,
() -> template.setMessageConverter(null));
}
@Test
@DisplayName("destroy deve fechar todos os clientes de envio armazenados em cache")
void destroy_shouldCloseAllSenders() {
// Arrange
when(converter.fromMessage(any(), eq(ServiceBusMessage.class)))
.thenReturn(mock(ServiceBusMessage.class));
when(senderClient.sendMessage(any())).thenReturn(Mono.empty());
when(producerFactory.createProducer(anyString())).thenReturn(senderClient);
// Act
template.sendAsync("queue", MessageBuilder.withPayload("x").build()).block();
template.destroy();
// Assert
verify(senderClient, times(1)).close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment