Created
May 22, 2014 23:52
-
-
Save codyaray/ac2eceb3ff92fa0eaf6b to your computer and use it in GitHub Desktop.
The Big-4 Rules of Storm Tuning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import static com.google.common.base.Preconditions.checkArgument; | |
/** | |
* Computes the parallelism for a particular topology and machine configuration. | |
* | |
* @author codyaray | |
* @since 4/21/2014 | |
*/ | |
public class StormParallelism { | |
private final int numberHosts; | |
private final int numberWorkersPerHost; | |
private final int numberWorkers; | |
private final int numberKafkaPartitions; | |
private final int numberKafkaPartitionsPerSpout; | |
private final int spoutParallelism; | |
private final int numberExecutorsPerCore; | |
private final int numberCoresPerHost; | |
private final int numberExecutorsPerHost; | |
private final int numberExecutorsPerWorker; | |
private final int transformParallelism; | |
private final int persistenceParallelism; | |
private StormParallelism(Builder builder) { | |
// Number of workers should be a multiple of number of machines | |
this.numberHosts = builder.numberHosts; | |
this.numberWorkersPerHost = builder.numberWorkersPerHost; | |
this.numberWorkers = numberHosts * numberWorkersPerHost; | |
// Number of partitions should be a multiple of spout parallelism | |
this.numberKafkaPartitions = builder.numberKafkaPartitions; | |
this.numberKafkaPartitionsPerSpout = builder.numberKafkaPartitionsPerSpout; | |
this.spoutParallelism = numberKafkaPartitions / numberKafkaPartitionsPerSpout; | |
// Parallelism should be a multiple of number of workers | |
this.numberExecutorsPerCore = builder.numberExecutorsPerCore; | |
this.numberCoresPerHost = builder.numberCoresPerHost; | |
this.numberExecutorsPerHost = numberExecutorsPerCore * numberCoresPerHost; | |
this.numberExecutorsPerWorker = numberExecutorsPerHost / numberWorkersPerHost; | |
this.transformParallelism = numberExecutorsPerWorker * numberWorkers; | |
// Reduce parallelism in persistence for best cache efficiency and lowest bulk-request overhead | |
this.persistenceParallelism = numberWorkers; | |
} | |
public int getNumberWorkers() { | |
return numberWorkers; | |
} | |
public int forSpoutLayer() { | |
return spoutParallelism; | |
} | |
public int forTransformLayer() { | |
return transformParallelism; | |
} | |
public int forPersistenceLayer() { | |
return persistenceParallelism; | |
} | |
public static Builder builder() { | |
return new Builder(); | |
} | |
/** | |
* Builder of {@link StormParallelism} configurations. | |
*/ | |
public static class Builder { | |
private int numberHosts; | |
private int numberWorkersPerHost = 1; | |
private int numberKafkaPartitions; | |
private int numberKafkaPartitionsPerSpout = 1; | |
private int numberCoresPerHost; | |
private int numberExecutorsPerCore = 1; | |
public Builder numberHosts(int numberHosts) { | |
this.numberHosts = numberHosts; | |
return this; | |
} | |
public Builder numberWorkersPerHost(int numberWorkersPerHost) { | |
this.numberWorkersPerHost = numberWorkersPerHost; | |
return this; | |
} | |
public Builder numberKafkaPartitions(int numberKafkaPartitions) { | |
this.numberKafkaPartitions = numberKafkaPartitions; | |
return this; | |
} | |
public Builder numberKafkaPartitionsPerSpout(int numberKafkaPartitionsPerSpout) { | |
this.numberKafkaPartitionsPerSpout = numberKafkaPartitionsPerSpout; | |
return this; | |
} | |
public Builder numberCoresPerHost(int numberCoresPerHost) { | |
this.numberCoresPerHost = numberCoresPerHost; | |
return this; | |
} | |
public Builder numberExecutorsPerCore(int numberExecutorsPerCore) { | |
this.numberExecutorsPerCore = numberExecutorsPerCore; | |
return this; | |
} | |
public StormParallelism build() { | |
checkArgument(numberHosts > 0); | |
checkArgument(numberKafkaPartitions > 0); | |
checkArgument(numberCoresPerHost > 0); | |
return new StormParallelism(this); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment