Skip to content

Instantly share code, notes, and snippets.

@nicholasdgoodman
Last active January 18, 2024 22:26
Show Gist options
  • Save nicholasdgoodman/d8d6d511cf90910e2fb93c97e0e2b609 to your computer and use it in GitHub Desktop.
Save nicholasdgoodman/d8d6d511cf90910e2fb93c97e0e2b609 to your computer and use it in GitHub Desktop.
Multi-Threaded Processing for Solace JCSMP

Description of Problem

When leveraging parallel processing of an event stream, it is often desirable to partition data along a particular piece of metadata referred to as a "key" upon which message orders can be guaranteed for a given key value. This same pattern can be extended down to individual consumer processes which may leverage multi-threaded processing within.

Parallel Processing in Java

In Java, the JDK provides the ExecutorService API for simplifying the task of running asynchronous code. A service instance can be created with a specified number of threads which then receives tasks which will be executed in parallel on one of these threads:

ExcecutorService executor = Executors.newFixedThreadPool(8); // allow up to 8 tasks to execute concurrently

Runnable task = () -> {
  try {
    System.out.println("Task Started");
    TimeUnit.MILLISECONDS.sleep(100);
    System.out.println("Task Ended");
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

executor.execute(task);
executor.execute(task);  // each task will be executed on a different thread
executor.execute(task);  // up to the specified maximum of 8
// ...

Custom Parallelization along a Key

There is no SDK-provided means of assigning a task to a thread based on a partitioning key; however, it is relatively straightforward to create a custom ExecutorService implementation that accomplishes this. When using the interface-provided APIs which do not specify a key value, tasks are assigned to threads at random, and when a key value is provided it is hash-modded to determine which thread the task should run on.

public class OrderedExecutorService implements ExecutorService {
  private final int nThreads;
  private final List<ExecutorService> executorServices;
  private final Random random;
  private final MessageDigest digest;

  private OrderedExecutorService(int nThreads) {
    this.nThreads = nThreads;
    this.executorServices = Stream
      .generate(() -> Executors.newSingleThreadExecutor())
      .limit(nThreads)
      .toList();
    this.random = new Random();
    MessageDigest sha256Digest = null;
    try {
        sha256Digest = MessageDigest.getInstance("SHA-256");
    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    }
    this.digest = sha256Digest;
  }

  public static OrderedExecutorService newFixedThreadPool(int nThreads) {
    return new OrderedExecutorService(nThreads);
  }

  public void execute(Runnable command, String key) {
    int threadId = getThreadId(key);
    ExecutorService executorService = executorServices.get(threadId);
    executorService.execute(command);
  }

  @Override
  public void execute(Runnable command) {
    int threadId = getRandomThreadId();
    ExecutorService executorService = executorServices.get(threadId);
    executorService.execute(command);
  }

  @Override
  public void shutdown() {
    executorServices.stream().forEach(es -> es.shutdown());
  }

  @Override
  public boolean isShutdown() {
    return executorServices.stream().allMatch(es -> es.isShutdown());
  }

  @Override
  public boolean isTerminated() {
    return executorServices.stream().allMatch(es -> es.isTerminated());
  }

  // ... remaining interface @Override definitions (or leave Unimplemented based on use-case)

  private int getThreadId(String key) {
    // modify the thread key to avoid moduluar interference from partition ID
    String formattedKey = String.format("/%s/", key);
    byte[] hashBytes = digest.digest(formattedKey.getBytes(StandardCharsets.UTF_8));
    int threadId = 
        (((hashBytes[0] & 0x7F) << 24) |
         ((hashBytes[1] & 0xFF) << 16) |
         ((hashBytes[2] & 0xFF) << 8 ) |
         ((hashBytes[3] & 0xFF) << 0 )) % nThreads;
    return threadId;
  }
  private int getRandomThreadId() {
    return random.nextInt(nThreads);
  }
}

Use Executor within JCSMP App

Ordered message processing typically only pertains to consuming messages from a Solace Queue. So in this case, we can integrate this keyed ExecutorService into the definition of the XMLMessageListener as follows:

class MultiThreadedMessageListener implements XMLMessageListener {
  private final OrderedExecutorService executorService;

  public MultiThreadedMessageListener(int nThreads) {
    this.executorService = OrderedExecutorService.newFixedThreadPool(nThreads);
  }
  
  @Override
  public void onReceive(BytesXMLMessage msg) {
    String partitionKey = null;
    try {
      partitionKeyProperty = msg
        .getProperties()
        .get(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
        .toString();
    } catch (SDTException e) {
      e.printStackTrace();
    }
    
    Runnable processMessage = () -> {
      // message processing business logic..
      
      // if processing was successful:
      msg.ackMessage();
    };
    
    if(partitionKey != null) {
      executorService.execute(processMessage, partitionKey); // execute along partition
    } else {
      executorService.execute(processMessage);  // execute on a random thread
    }
  }
  
  // ... remaining interface @Override definitions
}

Once this listener is defined, it can be attached to a JCSMP Solace session as follows:

// given a connected 'session' instance and defined 'THREAD_COUNT' and 'flowProperties':

session.createFlow(new MultiThreadedMessageListener(THREAD_COUNT), flowProperties)

Description of Problem

In cases where individual record processing is time or CPU intensive, and individual order of processing does not need to be maintained, it is possible to increase throughput by simply dispatching the message processing to worker thread.

Basic Implementation

In this case it is easily handled by adding an ExecuorService instance to the message listener:

    private static class MultiThreadedQueueFlowListener implements XMLMessageListener {
        private static final int CONCURRENCY = 4;
        ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);

        @Override
        public void onReceive(BytesXMLMessage msg) {
            // when messages are received, queue them immediately for future processing
            executor.execute(() -> {
                msgRecvCounter++;
                if (msg.getRedelivered()) {
                    hasDetectedRedelivery = true;
                }

                try {
                    Thread.sleep(250); // simulate a long running task
                } catch(InterruptedException ex) {
                    // nothing to do here
                }    

                // Ack only when the message processing is complete
                msg.ackMessage();
            });
        }

        @Override
        public void onException(JCSMPException e) {
            logger.warn("### Queue " + QUEUE_NAME + " Flow handler received exception.  Stopping!!", e);
            if (e instanceof JCSMPTransportException) {
                isShutdown = true;
            } else {
                flowQueueReceiver.close();
            }
        }
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment