Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active October 27, 2020 07:54
Show Gist options
  • Save srnagar/0c1bcf22b8536b7234f304682ba1aad9 to your computer and use it in GitHub Desktop.
Save srnagar/0c1bcf22b8536b7234f304682ba1aad9 to your computer and use it in GitHub Desktop.
Service Bus Processor

Service Bus Processor Java API

The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages.

The Processor API

The processor API is shown as the diff here on API view tool.

Updated ServiceBusClientBuilder

The option to create a processor instance is available on the ServiceBusClientBuilder. The switch to create a processor instance is similar to the pattern used for creating ServiceBusSenderClient, ServiceBusReceiverClient and ServiceBusSessionReceiverClient:

public final class ServiceBusClientBuilder {

     public ServiceBusProcessorClientBuilder processor() {}

     public ServiceBusSessionProcessorClientBuilder sessionProcessor() {}

     public final class ServiceBusProcessorClientBuilder {
        public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) 
        public ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount) 
        public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError) 
        public ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage) 
        public ServiceBusProcessorClientBuilder queueName(String queueName) 
        public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode) 
        public ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue) 
        public ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName) 
        public ServiceBusProcessorClientBuilder topicName(String topicName) 
        public ServiceBusProcessorClient buildProcessorClient() 
     }

     public final class ServiceBusSessionProcessorClientBuilder {
        public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) 
        public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions) 
        public ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount) 
        public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError) 
        public ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage) 
        public ServiceBusSessionProcessorClientBuilder queueName(String queueName) 
        public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode) 
        public ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName) 
        public ServiceBusSessionProcessorClientBuilder topicName(String topicName) 
        public ServiceBusProcessorClient buildProcessorClient() 
     }
}   

Service Bus Processor Client

The processor has methods to start, stop and close the processor:

public final class ServiceBusProcessorClient {
    // non-public constructors
    ServiceBusProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError, MessageProcessorOptions options) {}
    
    public void start() {}

    public void stop() {}
    
    public void close() {}
    
    public boolean isRunning() {}
}

Sample User code

This is a sample code that user will write to use the processor

// Sample code that processes a single message
Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
    try {
        System.out.println(messageContext.getMessage().getMessageId());
        // other message processing code
        messageContext.complete(); 
    } catch (Exception ex) {
        messageContext.abandon(); 
    }
}   

// Sample code that gets called if there's an error
Consumer<Throwable> processError = throwable -> {
    logError(throwable);
    metrics.recordError(throwable);
}

// Create a ServiceBusProcessorClient
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                .connectionString("connection-string")
                                .processor()
                                .topicName("topic-name")
                                .subscriptionName("subscription-name")
                                .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

// Stop the processor
processorClient.stop(); // this will only pause the processing but the links will be kept alive

// Close the processor
processorClient.close(); // this will shutdown the processor and the links will be cleaned up

The equivalent code in Track 1 would look like the following:

// Create the subscription client using the connection string
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder("connection-string");
SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);

// Register a message handler that starts processing messages
subscriptionClient.registerMessageHandler(new IMessageHandler() {
    @Override
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
        try {
            System.out.println(message.getMessageId());
            // other message processing code
            subscriptionClient.complete(message.getLockToken());
        } catch (Exception ex) {
            try {
                subscriptionClient.abandon(message.getLockToken());
            } catch (InterruptedException e) {
                logger.warn("Thread interrupted when attempting to abandon message", e);
            } catch (ServiceBusException e) {
                logger.warn("Exception occurred when attempting to abandon message", e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void notifyException(Throwable exception, ExceptionPhase phase) {
        logger.error("Error reported by the subscription message handler", exception);
        metrics.recordError(exception);
    }
}, Executors.newCachedThreadPool());

// Stops message processing
subscriptionClient.registerMessageHandler((IMessageHandler) null, (ExecutorService) null);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment