/service-bus-processor.md Secret
Last active
October 27, 2020 07:54
Revisions
-
srnagar revised this gist
Oct 27, 2020 . 1 changed file with 41 additions and 20 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,22 +6,46 @@ The Service Bus Processor provides a mechanism for users to start a receiving lo The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False). ### Updated ServiceBusClientBuilder The option to create a processor instance is available on the `ServiceBusClientBuilder`. The switch to create a processor instance is similar to the pattern used for creating `ServiceBusSenderClient`, `ServiceBusReceiverClient` and `ServiceBusSessionReceiverClient`: ```java public final class ServiceBusClientBuilder { public ServiceBusProcessorClientBuilder processor() {} public ServiceBusSessionProcessorClientBuilder sessionProcessor() {} public final class ServiceBusProcessorClientBuilder { public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) public ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount) public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError) public ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage) public ServiceBusProcessorClientBuilder queueName(String queueName) public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode) public ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue) public ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName) public ServiceBusProcessorClientBuilder topicName(String topicName) public ServiceBusProcessorClient buildProcessorClient() } public final class ServiceBusSessionProcessorClientBuilder { public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions) public ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount) public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError) public ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage) public ServiceBusSessionProcessorClientBuilder queueName(String queueName) public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode) public ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName) public ServiceBusSessionProcessorClientBuilder topicName(String topicName) public ServiceBusProcessorClient buildProcessorClient() } } ``` ### Service Bus Processor Client The processor has methods to start, stop and close the processor: @@ -63,25 +87,22 @@ Consumer<Throwable> processError = throwable -> { metrics.recordError(throwable); } // Create a ServiceBusProcessorClient ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString("connection-string") .processor() .topicName("topic-name") .subscriptionName("subscription-name") .buildProcessorClient(); // Starts the processor in the background and returns immediately processorClient.start(); // Stop the processor processorClient.stop(); // this will only pause the processing but the links will be kept alive // Close the processor processorClient.close(); // this will shutdown the processor and the links will be cleaned up ``` -
srnagar revised this gist
Oct 22, 2020 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -74,7 +74,7 @@ ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder() // Create a Service Bus Processor ServiceBusProcessor processor = receiverClient.createProcessor(processMessage, processError); // Starts the processor in the background and returns immediately processor.start(); // Stop the processor -
srnagar revised this gist
Oct 22, 2020 . 1 changed file with 35 additions and 2 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,12 +2,45 @@ The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages. ## The Processor API The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False). ### Updated ServiceBusReceiverClient The option to create a processor instance is available on the `ServiceBusReceiverClient` through the following two methods: ```java public final class ServiceBusReceiverClient { public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError) {} public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError, ServiceBusProcessorClientOptions options) {} } ``` ### Service Bus Processor The processor has methods to start, stop and close the processor: ```java public final class ServiceBusProcessorClient { // non-public constructors ServiceBusProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError, MessageProcessorOptions options) {} public void start() {} public void stop() {} public void close() {} public boolean isRunning() {} } ``` ## Sample User code This is a sample code that user will write to use the processor -
srnagar revised this gist
Oct 22, 2020 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -20,7 +20,7 @@ Consumer<ServiceBusProcessorContext> processMessage = messageContext -> { // other message processing code messageContext.complete(); } catch (Exception ex) { messageContext.abandon(); } } -
srnagar created this gist
Oct 22, 2020 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,91 @@ # Service Bus Processor Java API The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages. ### The Processor API The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False). ### User code This is a sample code that user will write to use the processor ```java // Sample code that processes a single message Consumer<ServiceBusProcessorContext> processMessage = messageContext -> { try { System.out.println(messageContext.getMessage().getMessageId()); // other message processing code messageContext.complete(); } catch (Exception ex) { messageContext.abandonAsync(); } } // Sample code that gets called if there's an error Consumer<Throwable> processError = throwable -> { logError(throwable); metrics.recordError(throwable); } // Create a ServiceBusReceiverClient ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder() .connectionString("connection-string") .receiver() .topicName("topic-name") .subscriptionName("subscription-name") .buildClient(); // Create a Service Bus Processor ServiceBusProcessor processor = receiverClient.createProcessor(processMessage, processError); // Start the processor processor.start(); // Stop the processor processor.stop(); // this will only pause the processing but the links will be kept alive // Close the processor processor.close(); // this will shutdown the processor and the links will be cleaned up ``` The equivalent code in Track 1 would look like the following: ```java // Create the subscription client using the connection string ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder("connection-string"); SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK); // Register a message handler that starts processing messages subscriptionClient.registerMessageHandler(new IMessageHandler() { @Override public CompletableFuture<Void> onMessageAsync(IMessage message) { try { System.out.println(message.getMessageId()); // other message processing code subscriptionClient.complete(message.getLockToken()); } catch (Exception ex) { try { subscriptionClient.abandon(message.getLockToken()); } catch (InterruptedException e) { logger.warn("Thread interrupted when attempting to abandon message", e); } catch (ServiceBusException e) { logger.warn("Exception occurred when attempting to abandon message", e); } } return CompletableFuture.completedFuture(null); } @Override public void notifyException(Throwable exception, ExceptionPhase phase) { logger.error("Error reported by the subscription message handler", exception); metrics.recordError(exception); } }, Executors.newCachedThreadPool()); // Stops message processing subscriptionClient.registerMessageHandler((IMessageHandler) null, (ExecutorService) null); ```