When leveraging parallel processing of an event stream, it is often desirable to partition data along a particular piece of metadata referred to as a "key" upon which message orders can be guaranteed for a given key value. This same pattern can be extended down to individual consumer processes which may leverage multi-threaded processing within.
In Java, the JDK provides the ExecutorService
API for simplifying the task of running asynchronous code. A service instance can be created with a specified number of threads which then receives tasks which will be executed in parallel on one of these threads:
ExcecutorService executor = Executors.newFixedThreadPool(8); // allow up to 8 tasks to execute concurrently
Runnable task = () -> {
try {
System.out.println("Task Started");
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Task Ended");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.execute(task);
executor.execute(task); // each task will be executed on a different thread
executor.execute(task); // up to the specified maximum of 8
// ...
There is no SDK-provided means of assigning a task to a thread based on a partitioning key; however, it is relatively straightforward to create a custom ExecutorService
implementation that accomplishes this. When using the interface-provided APIs which do not specify a key value, tasks are assigned to threads at random, and when a key value is provided it is hash-modded to determine which thread the task should run on.
public class OrderedExecutorService implements ExecutorService {
private final int nThreads;
private final List<ExecutorService> executorServices;
private final Random random;
private final MessageDigest digest;
private OrderedExecutorService(int nThreads) {
this.nThreads = nThreads;
this.executorServices = Stream
.generate(() -> Executors.newSingleThreadExecutor())
.limit(nThreads)
.toList();
this.random = new Random();
MessageDigest sha256Digest = null;
try {
sha256Digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
this.digest = sha256Digest;
}
public static OrderedExecutorService newFixedThreadPool(int nThreads) {
return new OrderedExecutorService(nThreads);
}
public void execute(Runnable command, String key) {
int threadId = getThreadId(key);
ExecutorService executorService = executorServices.get(threadId);
executorService.execute(command);
}
@Override
public void execute(Runnable command) {
int threadId = getRandomThreadId();
ExecutorService executorService = executorServices.get(threadId);
executorService.execute(command);
}
@Override
public void shutdown() {
executorServices.stream().forEach(es -> es.shutdown());
}
@Override
public boolean isShutdown() {
return executorServices.stream().allMatch(es -> es.isShutdown());
}
@Override
public boolean isTerminated() {
return executorServices.stream().allMatch(es -> es.isTerminated());
}
// ... remaining interface @Override definitions (or leave Unimplemented based on use-case)
private int getThreadId(String key) {
// modify the thread key to avoid moduluar interference from partition ID
String formattedKey = String.format("/%s/", key);
byte[] hashBytes = digest.digest(formattedKey.getBytes(StandardCharsets.UTF_8));
int threadId =
(((hashBytes[0] & 0x7F) << 24) |
((hashBytes[1] & 0xFF) << 16) |
((hashBytes[2] & 0xFF) << 8 ) |
((hashBytes[3] & 0xFF) << 0 )) % nThreads;
return threadId;
}
private int getRandomThreadId() {
return random.nextInt(nThreads);
}
}
Ordered message processing typically only pertains to consuming messages from a Solace Queue. So in this case, we can integrate this keyed ExecutorService
into the definition of the XMLMessageListener
as follows:
class MultiThreadedMessageListener implements XMLMessageListener {
private final OrderedExecutorService executorService;
public MultiThreadedMessageListener(int nThreads) {
this.executorService = OrderedExecutorService.newFixedThreadPool(nThreads);
}
@Override
public void onReceive(BytesXMLMessage msg) {
String partitionKey = null;
try {
partitionKeyProperty = msg
.getProperties()
.get(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
.toString();
} catch (SDTException e) {
e.printStackTrace();
}
Runnable processMessage = () -> {
// message processing business logic..
// if processing was successful:
msg.ackMessage();
};
if(partitionKey != null) {
executorService.execute(processMessage, partitionKey); // execute along partition
} else {
executorService.execute(processMessage); // execute on a random thread
}
}
// ... remaining interface @Override definitions
}
Once this listener is defined, it can be attached to a JCSMP Solace session as follows:
// given a connected 'session' instance and defined 'THREAD_COUNT' and 'flowProperties':
session.createFlow(new MultiThreadedMessageListener(THREAD_COUNT), flowProperties)