This gist contains performance measurements for event fan-out using Azure Event Hubs SDK for Java.
While all the numbers are specific to the environment, we can still use them to estimate performance numbers we can expect from applications that use Event Hubs and understand how different factors and configurations can affect them. Essentially, information in this gist can be used to explore optimization and performance tuning strategies.
- Producer sends events to the Event Hubs namespace.
- Consumers receive events from the namespace
- Upon each batch, the consumer fans out events in that batch to N partitions of another Event Hubs namespace
- If all events are sent correctly, consumer updates checkpoint
- If any of the partitions fail, processor does not checkpoint, fails and is being restarted (by the SDK)
We'll also add intentional processing failures at a very low rate to simulate processor losing ownership on partition and taking it over again.
We're using the throughput (number of events received and forwarded per second) as the main metric, but will also monitor other parameters.
The full test code can be found here. The essential part is batch processing callback:
// re-group events in the batch to M batches to forward to M partitions in the different hub.
Map<String, List<EventData>> batches = regroupEvents(batchContext);
// send all batches in parallel
Flux.fromStream(batches.entrySet().stream().filter(e -> e.getValue() != null && !e.getValue().isEmpty()))
.flatMap(entry -> {
// simulate some failures
if (ThreadLocalRandom.current().nextDouble() < processorFailureRatio) {
return Mono.error(new SimulatedFailure());
}
// send each batch to the corresponding partition (send calls createBatch for each list of events internally)
return producerClient.send(entry.getValue(), new SendOptions().setPartitionKey(entry.getKey()));
})
.parallel(batches.entrySet().size(), 1) // send to all partitions in parallel
.runOn(Schedulers.boundedElastic()) // make sure you have enough threads in the pool to execute all of them (this pool is configured with reactor.schedulers.defaultBoundedElasticSize VM option)
.then()
.block();
// when all batches are sent successfully, update checkpoint.
batchContext.updateCheckpoint();
If any batch sending fails, we let error propagate back to the Event Hubs processor client - it will stop processing corresponding partition. Partition will be re-assigned after a timeout. Then processing will resume from the last checkpoint resulting in some duplication consumers should expect.
Following parameters are configurable and we'll check how they affect performance
- Event Hubs versions: 5.17.1 or 5.18.0
- Number of CPU cores: 1 or 2
- Available memory: 1GB or 2GB
- Fan-out factor (how many partitions events are forwarded to): 1, 32, 64, 100
- Event payload size: 16 bytes, 1 KB, 8 KB
- Prefetch count: 1, 500, 1000, 2000 (per partition)
- Processor batch size: 32, 64, 128, 256 events
We're using Azure SDK stress tests and infra to run tests and measure performance, we leverage OpenTelemetry to report metrics from Event Hubs SDK and tests.
- Java 21
- AKS cluster with resource limits on containers
- Test application in the first container generates events.
- Test application in the second container receives and forward events. It has 2 processor client instances working in parallel to emulate load-balancing.
- We have only one instance of sender and one instance of forwarding applications.
- Event Hubs namespace
- Premium tier.
- 2 throughput units.
- we have both hubs (source one and one we forward to) in the same namespace.
- the source hub has 32 partitions.
- the forwarding hub has 32 partitions for most tests, 100 for higher fan-out factors tests.
Throughput observed by version 5.18.0 is almost twice as high as version 5.17.1 can handle. This is a result of improved memory consumption that lowers memory pressure and reduces garbage collection frequency.
Parameters: cores: 1; memory: 1GB; fan-out factor: 32; processor batch size: 128; payload size: 16 bytes; prefetch count: 1
Scenario | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
5.17.1 | 6290 | 6092 | 98.4% | 1 |
5.18.0 | 12079 | 11859 | 97.3% | 1 |
If we look into time spent in GC, application running 5.17.1 spends around 70% of time in GC across all generations, while with version 5.18.0 is drops to ~10%.
We're going to use version 5.18.0 in all other tests.
Here we see that the bare-bone forwarding scenario scales well vertically with CPU count. Increasing memory alone results in minor increase in the throughput. In the real-life scenarios, processing callback is more complicated and likely would need more memory/CPU.
Parameters: fan-out factor: 32; processor batch size: 128; payload size: 16 bytes; prefetch count: 1
Scenario | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
1 core, 1GB | 12079 | 11859 | 97.3% | 1 |
1 core, 2GB | 12506 | 12262 | 96.7% | 1.85 |
2 cores, 1GB | 22685 | 22287 | 81.6% | 1 |
2 cores, 2GB | 25411 | 25043 | 84.9% | 1.72 |
We're going to do the rest of the testing on 1 CPU, 1GB of memory.
Throughput slowly decreases when we increase fan-out factor.
Parameters: processor batch size: 128; payload size: 16 bytes; prefetch count: 1
Fan-out factor | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
1 | 16144 | 16119 | 58.1% | 1 |
32 | 12079 | 11859 | 97,3% | 1 |
64 | 9680 | 9448 | 95.5% | 1 |
100 | 8887 | 8647 | 94.8% | 1 |
The bigger fan-out factor is, the more batches we need to send. When sending batches in parallel, we need to make sure there are enough threads available for it. Assuming processor instance owns N source partitions and forwards to M other partitions, we'd need up to N x M threads and each thread would also need 1MB of memory.
Increasing fan-out factor, we also increase the latency of overall operation - it now matches the latency of the slowest operation out of M send calls plus any overhead parallelization introduces. Moreover, if a chance of transient error for sending a batch is 0.001%, then if we send 100 batches, we increase a chance of transient error for overall operation to 100 * 0.001 = 0.1%.
Note: the measurements above show low CPU usage for fan-out factor = 1 because the were not enough events in the source hub and the consumer was frequently idle waiting for more events, so we can be able to achieve even higher throughput in this edge case.
In the forwarding scenario, processor batch size is one of the key parameters. Since we keep fan-out factor constant, we're creating bigger batches to forward. Parallel send calls are the most expensive operation that our forwarder does, but the performance degrades with batch size only slightly. As a result, throughput grows significantly along with the processor batch size.
Parameters: fan-out factor: 32; payload size: 1KB; prefetch count: 1
Processor batch size | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
32 | 5420 | 5376 | 95.7% | 1 |
64 | 6605 | 6544 | 93.1% | 1 |
128 | 9991 | 9924 | 95% | 1 |
256 | 14297 | 14228 | 95.7% | 1 |
The bigger the payload is, the lower throughput is and the more memory is needed.
Parameters: fan-out factor: 32; processor batch size: 128; prefetch count: 1
Event size | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
16 bytes | 12079 | 11859 | 97.3% | 1 (out of 1GB available) |
1 KB | 9991 | 9924 | 95 | 1 (out of 1GB available) |
8 KB | 5620 | 5476 | 99.8 | 2.5 GB (out of 3GB available) |
Prefetch count does not affect throughput of the forwarding scenario in the conditions we have tested it. In our case Event Hubs resource and test application are co-located in the same Azure data center, resulting in a very fast and reliable network connection. We also have relatively complex processing callback. These factors combined make time and resources necessary to receive messages negligible.
It's always a great idea to experiment and find optimal prefetch count for your specific scenario and setup.
Parameters: fan-out factor: 32; processor batch size: 128; payload size: 16 bytes
Prefetch count per partition | Received events per sec | Forwarded events per sec | CPU usage, % | Memory usage, GB |
---|---|---|---|---|
1 | 12079 | 11859 | 97.3% | 1 |
500 | 11869 | 11646 | 97.2% | 1 |
1000 | 12016 | 11789 | 97.1% | 1 |
2000 | 11659 | 11400 | 97.2% | 1 |
Increasing amount of available memory has little-to-no-effect on the throughput in this scenario as well (rResults are omitted for brevity).
Note: prefetch count applies to each partition. When configuring it to relatively high values, make sure to configure enough memory. If processor owns P partitions, it would need to keep extra
P * prefetchCount
events in memory only on the receiving size. Amount of heap memory event uses can be estimated as 1KB + payload size. We recommend keeping prefetch-count low for Event Hubs versions 5.18.0 and lower to avoid excessive prefetching.
As we can see, performance of forwarding scenario depends on several important factors:
- available resources and their utilization
- event payload size and max batch size on the processor
- fan-out factor
Make sure to use the latest version of Event Hubs SDK. Please refer to the next section for additional suggestions on things to check and monitor.
We used the throughput - the number of successfully forwarded events - as a key indicator of performance in this scenario. We also kept an eye on CPU and memory to check that we use all available resources and observe the peak performance numbers with given configuration. In practice, there could be other important metrics not correlated with throughput such as end-to-end latency or duplication rate which we need to measure and take into account when tuning performance.
Such metrics could be correlated to throughput to some extent, but not fully. For example, we could achieve high throughput with very high processor batch size. However in presence of transient issues when forwarding a subset of events, the whole batch won't not be checkpointed - it will be re-processed after a delay resulting in some events forwarded twice.
Another problem of huge batch size is that processor waits for that number of events (or configurable amount of time whatever comes first) before dispatching the batch to the application. During periods of lower load, it could mean that processors spend time waiting for events to come increasing end-to-end latency.
It's important to identify and monitor key indicators for your system. In addition to these indicators, we need other observability signals to understand what went wrong when it (inevitably) happens. Let's list signals that are important to monitor for the forwarding scenario.
Processing error rate is one of the most important signals for Event Hubs consumers - if exception is not handled within the processing callback, it starts the following series of events:
- Exception is reported on the
processError
callback. - Processor client stops processing this partition and closes underlying connection to the Event Hubs.
- This or another processor instance takes ownership over this partition after
partitionOwnershipExpirationInterval
(defaults to 2 minutes). - Processor re-establishes the connection and starts processing events in that partition from the last recorded checkpoint.
So, if we can't forward a batch to the final destination:
- we should not checkpoint source batch
- we should (re)throw an exception and let processor client stop processing this partition - if we swallow the exception and let the processor continue, it will process the next batch. This would break the ordering, but most importantly, the next batch could succeed resulting in updating checkpoint. So the first faulty batch would never be processed again.
So once we stop processing this partition it will stay idle for configured partitionOwnershipExpirationInterval
. It could makes sense to reduce this interval - refer to the Duration and error rate of checkpoint calls below for considerations.
It could be useful to record some telemetry in processor client processPartitionInitialization
and processPartitionClose
callbacks along with the partitionId to see how frequently it happens.
It's also important to monitor the latency of overall process operation that includes forwarding and checkpointing calls. Usually we need to understand the distribution of the latency and check P50 and P95 or other higher percentiles of the distribution. Check out this excellent talk by Gil Tene on understanding application responsiveness and avoiding common pitfalls when measuring latency.
Root-causing and improving the long tail or latency distribution usually brings more benefit in distributed applications than local performance optimizations affecting median latency only.
To investigate the sources of high latency we'd need to see durations of underlying operation and their distribution. Event Hubs is instrumented with OpenTelemetry and supports distributed tracing and emits metrics in experimental mode.
Forwarding re-grouped events to final destination is probably the biggest contributor to processing latency. As we've seen the fan-out factor is the key factor here. Also, since we send forwarded batches in parallel, we're adding another source of potential issues related to threading and synchronization.
If individual send calls are fast and reliable, but overall operation is slow, make sure to check and benchmark parallelization code.
If some of the individual send calls are very long and result in timeouts, these may be caused by transitive network issues. Timed out calls are retried by the Event Hubs sender client and in this case timeout value and back-off intervals have the highest impact on the overall latency.
If your connection to the Event Hubs is usually fast and reliable (for example if you host application in Azure and in the same region as Event Hubs resource), you can try reducing the try timeout to a several seconds
new EventHubClientBuilder()
.retryOptions(new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(5)))
.eventHubName(options.getEventHubsEventHubName())
...
The default try timeout is set to 1 minute, but with reliable connection it usually does not make sense to wait for more than a few seconds until the service responds. If it does not respond soon, it's likely to never respond and waiting for longer is not helpful.
Issues like timeouts may be also caused by high CPU, overloaded IO thread, or threading issues such as synchronization or starvation. If you see high rate of timeout issues, make sure to check CPU and memory on the corresponding consumer instances at that time.
In case of low CPU utilization combined with low throughput (when you expect system to do more), check:
- do you have adequate thread pool sizes? When parallelizing event forwarding, we need up to
fanOutFactor
threads per owned partition. Make sure to configure thread pools for it. Here's the project reactor thread pool documentation describing configuration options and defaults. - profile the application and check if some threads are much more busy than others. Event Hubs handles IO operations for a specific connection on special threads following
reactor-executor-*
naming pattern. If these threads are very busy, consider creating multiple Event Hubs producer or processor clients to process and/or publish events and spread work between them. Or, if you share connection between producer clients, stop doing it.
Checkpointing involves a network call that might be another source of the latency issues and errors. Consider reducing timeout as an optimization - please refer to Azure Storage Blob samples on how to configure HTTP client timeouts. Refer to the Event Hubs documentation for general guidance on using Azure Storage Blobs as a checkpoint store.
Note that the same storage account is used for load balancing to claim and preserve ownership on partitions. If checkpoint latency or error rate is high, it most likely means that processors can't preserve their ownership and lose it to other processor instances, causing higher event duplication rate.
You can control how frequently ownership is renewed with loadBalancingUpdateInterval
setter on processor client builder. Make sure to keep load balancing update interval to be much smaller than partitionOwnershipExpirationInterval
.
It's common to keep resource limits low in containerized environments and scale then up dynamically. So high resource utilization could be healthy (unless it's unusually high).
One of the important things to check when running high-scale Java applications is memory consumption and number of application/container restarts.
If application dies with Java out-of-memory errors or container runtime kills it (because it requests more memory than allowed to) it's a good signal that either there is a memory leak or application just needs more memory to function.
In both cases it's a good idea to get a memory dump (make sure to enable one, for example with -XX:HeapDumpOnOutOfMemoryError
VM option) and investigate what's taking up all the memory.
From Event Hubs perspective, you might need to decrease processor batch size and prefetch count to reduce the number of events kept in the memory.
Check how much time is spent in garbage collection - if there is not enough memory, application can waste a big chunk of the CPU time on GC. Optimizing memory usage or increasing amount of available memory can significantly improve performance. If you see that memory consumption grows over time despite garbage collection attempts to clean up heap space, it's likely a signal of a memory leak. Taking several heap dumps and comparing them between each other should show which objects retained heap space over time and what's holding them in the memory.
Relatively high CPU in messaging scenarios is usually a good signal that workers are utilized well. Since Event Hubs processor client comes with backpressure, we don't need to protect them from unexpected bursts of load. Still, it's a good idea to understand what's causing high CPU usage. Profiling your application during development or in production with continuous profiler should show any expensive operations on the hot path and is a great input for local optimizations. Make sure to also check time spent in garbage collection.
Low CPU utilization combined with low throughput (when you expect system to do more) is a signal of synchronization, locking, or a similar issue. From the Event Hubs side, check
- how many partitions are owned by the worker instance? Are there enough events to process?
- how big processor batches typically are comparing to configured maximum batch size. If they are smaller than the batch size, reduce max wait time to wait less for the full batch to come
- check if other recommendations in Duration and error rate of re-sending calls apply.
Some level of network issues is expected in high-scale distributed application and Azure SDKs should handle and retry them. In some cases network issues last for prolonged periods of time and SDK exhausts all retries. Processor client however does not stop trying to receive events until it's stopped or disposed. If connection terminates or a response indicating an error is received, it re-establishes the connection. The processor client retries all issues, including authorization issues or missing entity errors, so that if entity was deleted by mistake, it can be restored on the Event Hubs service side without the need to restart the application.
You can use metrics emitted by the SDK to check number of transport-related errors - check out experimental SDK metrics to monitor transport-level issues.
Check out Event Hubs troubleshooting guide for more details on how to investigate connectivity issues.
As load in your application grows, make sure to check if Event Hubs is scaled adequately. Refer to the Monitor Azure Event Hubs and Azure Event Hubs quotas and limits articles for the details.