Tuning a dataflow system is easy:
The First Rule of Dataflow Tuning: * Ensure each stage is always ready to accept records, and * Deliver each processed record promptly to its destination
That may seem insultingly simplistic, but my point is that a) if you respect the laws of physics and economics, you can make your dataflow obey the First Rule; b) once your dataflow does obey the First Rule, stop tuning it.
Outline:
-
Topology; Little’s Law
-
skew
-
-
System: machines; workers/machine, machine sizing; (zookeeper, kafka sizing)
-
Throttling: batch size; kafka-partitions; max pending; trident batch delay; spout delay; timeout
-
Congestion: number of ackers; queue sizing (exec send, exec recv, transfer)
-
Memory: Max heap (Xmx), new gen/survivor size; (queue sizes)
-
Ulimit, other ntwk sysctls for concurrency and ntwk; Netty vs ZMQ transport; drpc.worker.threads;
-
Other important settings: preferIPv4;
transactional.zookeeper.root
(parent name for transactional state ledger in Zookeeper);` (java options passed to your worker function), `topology.worker.shared.thread.pool.size
-
Don’t touch:
zmq.hwm
(unless you are seeing unreliable network trnsport under bursty load), disruptor wait strategy, worker receive buffer size,zmq.threads
First, identify your principal goal: latency, throughput, memory or cost. We’ll just discuss latency and throughput as goals — tuning for cost means balancing the throughput (records/hour per machine) and cost of infrastructure (amortized $/hour per machine), so once you’ve chosen your hardware, tuning for cost is equivalent to tuning for throughput. I’m also going to concentrate on typical latency/throughput, and not on variance or 99th percentile figures or somesuch.
Next, identify your dataflow’s principal bottleneck, the constraining resource that most tightly bounds the performance of its slowest stage. A dataflow can’t pass through more records per second than the cumulative output of its most constricted stage, and it can’t deliver records in less end-to-end time than the stage with the longest delay.
The principal bottleneck may be:
-
IO volume: there’s a hardware bottleneck to the number of bytes per second that a machine’s disks or network connection can sustain. Event log processing often involves large amounts of data requiring only parsing or other trivial transformations before storage — throughput of such dataflows are IO bound.
-
CPU: a CPU-bound flow spends more time in calculations to process a record
-
concurrency: network requests to an external resource often require almost no CPU and minimal volume. If your principal goal is throughput, the flow is only bound by how many network requests you can make in parallel.
-
remote rate bottleneck bound: alternatively, you may be calling an external resource that imposes a maximum throughput out of your control. A legacy datastore might only be able to serve a certain volume of requests before its performance degrades, or terms-of-service restrictions from a third-party web API (Google’s Geolocation API.)
-
memory: large windowed joins or memory-intensive analytics algorithms may require so much RAM it defines the machine characteristics
If you’re memory-bound, use machines with lots of RAM. Otherwise, start tuning on a machine with lots of cores and over-provision the RAM, we’ll optimize the hardware later.
For a CPU-bound flow:
-
Construct a topology with parallelism one
-
set max-pending to one, use one acker per worker, and ensure that storm’s
nofiles
ulimit is large (65000 is a decent number). -
Set the trident-batch-delay to be comfortably larger than the end-to-end latency — there should be a short additional delay after each batch completes.
-
Time the flow through each stage.
-
Increase the parallelism of CPU-bound stages to nearly saturate the CPU, and at the same time adjust the batch size so that state operations (aggregates, bulk database reads/writes, kafka spout fetches) don’t slow down the total batch processing time.
-
Keep an eye on the GC activity. You should see no old-gen or STW GCs, and efficient new-gen gcs (your production goal no more than one new-gen gc every 10 seconds, and no more than 10ms pause time per new-gen gc, but for right now just overprovision — set the new-gen size to give infrequent collections and don’t worry about pause times).
Once you have roughly dialed in the batch size and parallelism, check in with the First Rule. The stages upstream of your principal bottleneck should always have records ready to process. The stages downstream should always have capacity to accept and promptly deliver processed records.
Use one worker per topology per machine: storm passes tuples directly from sending executor to receiving executor if they’re within the same worker. Also set number of ackers equal to number of workers — the default of one per topology never makes sense (future versions of Storm will fix this).
Match your spout parallelism to its downstream flow. Use the same number of kafka partitions as kafka spouts (or a small multiple). If there are more spouts than kafka machines*kpartitions, the extra spouts will sit idle.
For CPU-bound stages, set one executor per core for the bounding stage (or one less than cores at large core count). Don’t adjust the parallelism without reason — even a shuffle implies network transfer. Shuffles don’t impart any load-balancing.
For map states or persistentAggregates — things where results are accumulated into memory structures — allocate one stage per worker. Cache efficiency and batch request overhead typically improve with large record set sizes.
In a concurrency bound problem, use very high parallelism If possible, use a QueryFunction to combine multiple queries into a batch request.
-
Throughput (recs/s) = Capacity / Latency
-
you can’t have better throughput than the collective rate of your slowest stage;
-
you can’t have better latency than the sum of the individual latencies.
If all records must pass through a stage that handles 10 records per second, then the flow cannot possibly proceed faster than 10 records per second, and it cannot have latency smaller than 100ms (1/10)
-
with 20 parallel stages, the 95th percentile latency of your slowest stage becomes the median latency of the full set. (TODO: nail down numbers)
Set the batch size to optimize the throughput of your most expensive batch operation — a bulk database operation, network request, or intensive aggregation. (There might instead be a natural batch size: for example the twitter users/lookup
API call returns information on up to 100 distinct user IDs.)
The batch count for the Kafka spout is controlled indirectly by the max fetch bytes. The resulting total batch size is at most (kafka partitions) * (max fetch bytes)
.
For example, given a topology with six kafka spouts and four brokers with three kafka-partitions per broker, you have twelve kafka-partitions total, two per spout. When the MBCoordinator calls for a new batch, each spout produces two sub-batches (one for each kafka-partition), each into its own trident-partition. Now also say you have records of 1000 +/- 100 bytes, and that you set max-fetch-bytes to 100_000. The spout fetches the largest discrete number of records that sit within max-fetch-bytes — so in this case, each sub-batch will have between 90 and 111 records. That means the full batch will have between 1080 and 1332 records, and 1_186_920 to 1_200_000 bytes.
-
each()
functions should not care about batch size. -
partitionAggregate
,partitionPersist
,partitionQuery
do.
Typically, you’ll find that there are three regimes:
-
when it’s too small, response time is flat — it’s dominated by bookeeping.
-
it then grows slowly with batch size. For example, a bulk put to elasticsearch will take about 200ms for 100 records, about 250ms for 1000 records, and about 300ms for 2000 records (TODO: nail down these numbers).
-
at some point, you start overwhelming some resource on the other side, and execution time increases sharply.
Since the execution time increases slowly in case (2), you get better and better records-per-second throughput. Choose a value that is near the top range of (2) but comfortably less than regime (3).
Don’t worry about this setting until most other things stabilize — it’s mostly important for ensuring that a burst of records doesn’t clog the send queue.
Set the executor send buffer to be larger than the batch record count of the spout or first couple stages. Since it applies universally, don’t go crazy with this value. It has to be an even power of two (1024, 2048, 4096, 8192, 16384).
Our worker JVM options:
worker.childopts: >- -Xmx2600m -Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Xloggc:logs/gc-worker-%ID%.log -verbose:gc -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal
This sets:
-
New-gen size to 1000 MB (
-XX:MaxNewSize=1000m
). Almost all the objects running through storm are short-lived — that’s what the First Rule of data stream tuning says — so almost all your activity is here. -
Apportions that new-gen space to give you 800mb for newly-allocated objects and 100mb for objects that survive the first garbage collection pass.
-
Initial perm-gen size of 96m (a bit generous, but Clojure uses a bit more perm-gen than normal Java code would), and a hard cap of 128m (this should not change much after startup, so I want it to die hard if it does).
-
Implicit old-gen size of 1500 MB (total heap minus new- and perm-gens) The biggest demand on old-gen space comes from long-lived state objects: for example an LRU counting cache or dedupe’r. A good initial estimate for the old-gen size is the larger of a) twice the old-gen occupancy you observe in a steady-state flow, or b) 1.5 times the new-gen size. The settings above are governed by case (b).
-
Total heap of 2500 MB (
-Xmx2500m
): a 1000 MB new-gen, a 100 MB perm-gen, and the implicit 1500 MB old-gen. Don’t use gratuitously more heap than you need — long gc times can cause timeouts and jitter. Heap size larger than 12GB is trouble on AWS, and heap size larger than 32GB is trouble everywhere. -
Tells it to use the "concurrent-mark-and-sweep" collector for long-lived objects, and to only do so when the old-gen becomes crowded.
-
Enables that a few mysterious performance options
-
Logs GC activity at max verbosity, with log rotation
If you watch your GC logs, in steady-state you should see
-
No stop-the-world (STW) gc’s — nothing in the logs about aborting parts of CMS
-
old-gen GCs should not last longer than 1 second or happen more often than every 10 minutes
-
new-gen GCs should not last longer than 50 ms or happen more often than every 10 seconds
-
new-gen GCs should not fill the survivor space
-
perm-gen occupancy is constant
Side note: regardless of whether you’re tuning your overall flow for latency or throughput, you want to tune the GC for latency (low pause times). Since things like committing a batch can’t proceed until the last element is received, local jitter induces global drag.
Max-pending (TOPOLOGY_MAX_SPOUT_PENDING
) sets the number of tuple trees live in the system at any one time.
Trident-batch-delay (topology.trident.batch.emit.interval.millis
) sets the maximum pace at which the trident Master Batch Coordinator will issue new seed tuples. It’s a cap, not an add-on: if t-b-d is 500ms and the most recent batch was released 486ms, the spout coordinator will wait 14ms before dispensing a new seed tuple. If the next pending entry isn’t cleared for 523ms, it will be dispensed immediately. If it took 1400ms, it will also be released immediately — but no make-up tuples are issued.
Trident-batch-delay is principally useful to prevent congestion, especially around startup. As opposed to a traditional Storm spout, a Trident spout will likely dispatch hundreds of records with each batch. If max-pending is 20, and the spout releases 500 records per batch, the spout will try to cram 10,000 records into its send queue.
-
System: machines; workers/machine, machine sizing; (zookeeper, kafka sizing)
-
Throttling: batch size; kafka-partitions; max pending; trident batch delay; spout delay; timeout
-
Congestion: number of ackers; queue sizing (exec send, exec recv, transfer);
zmq.threads
-
Memory: Max heap (Xmx), new gen/survivor size; (queue sizes)
-
Ulimit, other ntwk sysctls for concurrency and ntwk; Netty vs ZMQ transport; drpc.worker.threads;
-
Other important settings: preferIPv4;
transactional.zookeeper.root
(parent name for transactional state ledger in Zookeeper);` (java options passed to your worker function), `topology.worker.shared.thread.pool.size
-
Don’t touch:
zmq.hwm
(unless you are seeing unreliable network trnsport under bursty load), disruptor wait strategy, worker receive buffer size