The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages.
The processor API is shown as the diff here on API view tool.
The option to create a processor instance is available on the ServiceBusClientBuilder
. The switch to create a processor instance is similar to the pattern used for creating ServiceBusSenderClient
, ServiceBusReceiverClient
and ServiceBusSessionReceiverClient
:
public final class ServiceBusClientBuilder {
public ServiceBusProcessorClientBuilder processor() {}
public ServiceBusSessionProcessorClientBuilder sessionProcessor() {}
public final class ServiceBusProcessorClientBuilder {
public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
public ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)
public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError)
public ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage)
public ServiceBusProcessorClientBuilder queueName(String queueName)
public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode)
public ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)
public ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)
public ServiceBusProcessorClientBuilder topicName(String topicName)
public ServiceBusProcessorClient buildProcessorClient()
}
public final class ServiceBusSessionProcessorClientBuilder {
public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)
public ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)
public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError)
public ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage)
public ServiceBusSessionProcessorClientBuilder queueName(String queueName)
public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode)
public ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)
public ServiceBusSessionProcessorClientBuilder topicName(String topicName)
public ServiceBusProcessorClient buildProcessorClient()
}
}
The processor has methods to start, stop and close the processor:
public final class ServiceBusProcessorClient {
// non-public constructors
ServiceBusProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError, MessageProcessorOptions options) {}
public void start() {}
public void stop() {}
public void close() {}
public boolean isRunning() {}
}
This is a sample code that user will write to use the processor
// Sample code that processes a single message
Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
try {
System.out.println(messageContext.getMessage().getMessageId());
// other message processing code
messageContext.complete();
} catch (Exception ex) {
messageContext.abandon();
}
}
// Sample code that gets called if there's an error
Consumer<Throwable> processError = throwable -> {
logError(throwable);
metrics.recordError(throwable);
}
// Create a ServiceBusProcessorClient
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("connection-string")
.processor()
.topicName("topic-name")
.subscriptionName("subscription-name")
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
// Stop the processor
processorClient.stop(); // this will only pause the processing but the links will be kept alive
// Close the processor
processorClient.close(); // this will shutdown the processor and the links will be cleaned up
The equivalent code in Track 1 would look like the following:
// Create the subscription client using the connection string
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder("connection-string");
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);
// Register a message handler that starts processing messages
subscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
try {
System.out.println(message.getMessageId());
// other message processing code
subscriptionClient.complete(message.getLockToken());
} catch (Exception ex) {
try {
subscriptionClient.abandon(message.getLockToken());
} catch (InterruptedException e) {
logger.warn("Thread interrupted when attempting to abandon message", e);
} catch (ServiceBusException e) {
logger.warn("Exception occurred when attempting to abandon message", e);
}
}
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
logger.error("Error reported by the subscription message handler", exception);
metrics.recordError(exception);
}
}, Executors.newCachedThreadPool());
// Stops message processing
subscriptionClient.registerMessageHandler((IMessageHandler) null, (ExecutorService) null);