Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active October 27, 2020 07:54

Revisions

  1. srnagar revised this gist Oct 27, 2020. 1 changed file with 41 additions and 20 deletions.
    61 changes: 41 additions & 20 deletions service-bus-processor.md
    Original file line number Diff line number Diff line change
    @@ -6,22 +6,46 @@ The Service Bus Processor provides a mechanism for users to start a receiving lo

    The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False).

    ### Updated ServiceBusReceiverClient
    ### Updated ServiceBusClientBuilder

    The option to create a processor instance is available on the `ServiceBusReceiverClient` through the following two methods:
    The option to create a processor instance is available on the `ServiceBusClientBuilder`. The switch to create a processor instance is similar to the pattern used for creating `ServiceBusSenderClient`, `ServiceBusReceiverClient` and `ServiceBusSessionReceiverClient`:

    ```java
    public final class ServiceBusReceiverClient {

    public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage,
    Consumer<Throwable> processError) {}

    public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage,
    Consumer<Throwable> processError, ServiceBusProcessorClientOptions options) {}
    public final class ServiceBusClientBuilder {

    public ServiceBusProcessorClientBuilder processor() {}

    public ServiceBusSessionProcessorClientBuilder sessionProcessor() {}

    public final class ServiceBusProcessorClientBuilder {
    public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
    public ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)
    public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError)
    public ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage)
    public ServiceBusProcessorClientBuilder queueName(String queueName)
    public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode)
    public ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)
    public ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)
    public ServiceBusProcessorClientBuilder topicName(String topicName)
    public ServiceBusProcessorClient buildProcessorClient()
    }

    public final class ServiceBusSessionProcessorClientBuilder {
    public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
    public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)
    public ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)
    public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError)
    public ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusProcessorContext> processMessage)
    public ServiceBusSessionProcessorClientBuilder queueName(String queueName)
    public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode)
    public ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)
    public ServiceBusSessionProcessorClientBuilder topicName(String topicName)
    public ServiceBusProcessorClient buildProcessorClient()
    }
    }
    ```

    ### Service Bus Processor
    ### Service Bus Processor Client

    The processor has methods to start, stop and close the processor:

    @@ -63,25 +87,22 @@ Consumer<Throwable> processError = throwable -> {
    metrics.recordError(throwable);
    }

    // Create a ServiceBusReceiverClient
    ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
    // Create a ServiceBusProcessorClient
    ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .connectionString("connection-string")
    .receiver()
    .processor()
    .topicName("topic-name")
    .subscriptionName("subscription-name")
    .buildClient();

    // Create a Service Bus Processor
    ServiceBusProcessor processor = receiverClient.createProcessor(processMessage, processError);
    .buildProcessorClient();

    // Starts the processor in the background and returns immediately
    processor.start();
    processorClient.start();

    // Stop the processor
    processor.stop(); // this will only pause the processing but the links will be kept alive
    processorClient.stop(); // this will only pause the processing but the links will be kept alive

    // Close the processor
    processor.close(); // this will shutdown the processor and the links will be cleaned up
    processorClient.close(); // this will shutdown the processor and the links will be cleaned up

    ```

  2. srnagar revised this gist Oct 22, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion service-bus-processor.md
    Original file line number Diff line number Diff line change
    @@ -74,7 +74,7 @@ ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
    // Create a Service Bus Processor
    ServiceBusProcessor processor = receiverClient.createProcessor(processMessage, processError);

    // Start the processor
    // Starts the processor in the background and returns immediately
    processor.start();

    // Stop the processor
  3. srnagar revised this gist Oct 22, 2020. 1 changed file with 35 additions and 2 deletions.
    37 changes: 35 additions & 2 deletions service-bus-processor.md
    Original file line number Diff line number Diff line change
    @@ -2,12 +2,45 @@

    The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages.

    ### The Processor API
    ## The Processor API

    The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False).

    ### Updated ServiceBusReceiverClient

    ### User code
    The option to create a processor instance is available on the `ServiceBusReceiverClient` through the following two methods:

    ```java
    public final class ServiceBusReceiverClient {

    public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage,
    Consumer<Throwable> processError) {}

    public ServiceBusProcessorClient createProcessorClient(Consumer<ServiceBusProcessorContext> processMessage,
    Consumer<Throwable> processError, ServiceBusProcessorClientOptions options) {}
    }
    ```

    ### Service Bus Processor

    The processor has methods to start, stop and close the processor:

    ```java
    public final class ServiceBusProcessorClient {
    // non-public constructors
    ServiceBusProcessorClient(Consumer<ServiceBusProcessorContext> processMessage, Consumer<Throwable> processError, MessageProcessorOptions options) {}

    public void start() {}

    public void stop() {}

    public void close() {}

    public boolean isRunning() {}
    }
    ```

    ## Sample User code

    This is a sample code that user will write to use the processor

  4. srnagar revised this gist Oct 22, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion service-bus-processor.md
    Original file line number Diff line number Diff line change
    @@ -20,7 +20,7 @@ Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
    // other message processing code
    messageContext.complete();
    } catch (Exception ex) {
    messageContext.abandonAsync();
    messageContext.abandon();
    }
    }

  5. srnagar created this gist Oct 22, 2020.
    91 changes: 91 additions & 0 deletions service-bus-processor.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,91 @@
    # Service Bus Processor Java API

    The Service Bus Processor provides a mechanism for users to start a receiving loop that runs forever until either the customer stops the processor or the process goes down. Upon receiving a message, the processor executes customer-provided message handler code. The processor is intended to let the customer focus on writing the core message processing code and let the processor focus on handling the task of receiving messages and keeping the loop going. In case of errors, the processor will notify the customer through an error handler provided by the customer and continues to receive messages.

    ### The Processor API

    The processor API is shown as the diff here on [API view tool](https://apiview.dev/Assemblies/Review/9a261d82efa047c89b035f4021ef7299?diffRevisionId=1482aeb7e32646808fd08757e8d69a4f&doc=False).


    ### User code

    This is a sample code that user will write to use the processor


    ```java
    // Sample code that processes a single message
    Consumer<ServiceBusProcessorContext> processMessage = messageContext -> {
    try {
    System.out.println(messageContext.getMessage().getMessageId());
    // other message processing code
    messageContext.complete();
    } catch (Exception ex) {
    messageContext.abandonAsync();
    }
    }

    // Sample code that gets called if there's an error
    Consumer<Throwable> processError = throwable -> {
    logError(throwable);
    metrics.recordError(throwable);
    }

    // Create a ServiceBusReceiverClient
    ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder()
    .connectionString("connection-string")
    .receiver()
    .topicName("topic-name")
    .subscriptionName("subscription-name")
    .buildClient();

    // Create a Service Bus Processor
    ServiceBusProcessor processor = receiverClient.createProcessor(processMessage, processError);

    // Start the processor
    processor.start();

    // Stop the processor
    processor.stop(); // this will only pause the processing but the links will be kept alive

    // Close the processor
    processor.close(); // this will shutdown the processor and the links will be cleaned up

    ```

    The equivalent code in Track 1 would look like the following:

    ```java
    // Create the subscription client using the connection string
    ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder("connection-string");
    SubscriptionClient subscriptionClient = new SubscriptionClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);

    // Register a message handler that starts processing messages
    subscriptionClient.registerMessageHandler(new IMessageHandler() {
    @Override
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
    try {
    System.out.println(message.getMessageId());
    // other message processing code
    subscriptionClient.complete(message.getLockToken());
    } catch (Exception ex) {
    try {
    subscriptionClient.abandon(message.getLockToken());
    } catch (InterruptedException e) {
    logger.warn("Thread interrupted when attempting to abandon message", e);
    } catch (ServiceBusException e) {
    logger.warn("Exception occurred when attempting to abandon message", e);
    }
    }
    return CompletableFuture.completedFuture(null);
    }

    @Override
    public void notifyException(Throwable exception, ExceptionPhase phase) {
    logger.error("Error reported by the subscription message handler", exception);
    metrics.recordError(exception);
    }
    }, Executors.newCachedThreadPool());

    // Stops message processing
    subscriptionClient.registerMessageHandler((IMessageHandler) null, (ExecutorService) null);
    ```