Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ocadaruma/e80be044227d6235126310e9058f546d to your computer and use it in GitHub Desktop.
Save ocadaruma/e80be044227d6235126310e9058f546d to your computer and use it in GitHub Desktop.
KAFKA-17061 benchmark
$ ./jmh-benchmarks/jmh.sh -prof "async:libPath=/path/to/libasyncProfiler.so;output=flamegraph" ControllerBrokerRequestBatchBenchmark
// Place this file under jmh-benchmarks/src/main/java/org/apache/kafka/jmh/controller
package org.apache.kafka.jmh.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.AbstractControlRequest.Builder;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.UpdateMetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.server.common.MetadataVersion;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import kafka.api.LeaderAndIsr;
import kafka.cluster.Broker;
import kafka.controller.AbstractControllerBrokerRequestBatch;
import kafka.controller.ControllerContext;
import kafka.controller.LeaderIsrAndControllerEpoch;
import kafka.controller.ReplicaAssignment;
import kafka.controller.StateChangeLogger;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import scala.Function1;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ControllerBrokerRequestBatchBenchmark {
private static final int RANDOM_PORT = 0;
private static final int NUM_BROKERS = 200;
private static final int NUM_PARTITIONS = 40000;
private static class Batch extends AbstractControllerBrokerRequestBatch {
private Batch(ControllerContext controllerContext) {
super(KafkaConfig.fromProps(TestUtils.createBrokerConfig(
1, "zkConnect", true, true, RANDOM_PORT,
scala.Option.empty(),
scala.Option.empty(),
scala.Option.empty(),
true, false, RANDOM_PORT, false, RANDOM_PORT, false, RANDOM_PORT,
scala.Option.empty(),
1, false, 1, (short) 1, false)),
() -> controllerContext,
() -> MetadataVersion.LATEST_PRODUCTION,
new StateChangeLogger(1, true, scala.Option.empty()),
false);
}
@Override
public void sendRequest(int brokerId, Builder<? extends AbstractControlRequest> request,
Function1<AbstractResponse, BoxedUnit> callback) {
}
@Override
public void handleLeaderAndIsrResponse(LeaderAndIsrResponse response, int broker) {
}
@Override
public void handleUpdateMetadataResponse(UpdateMetadataResponse response, int broker) {
}
@Override
public void handleStopReplicaResponse(StopReplicaResponse stopReplicaResponse, int brokerId,
Map<TopicPartition, Errors> partitionErrorsForDeletingTopics) {
}
}
private Batch batch;
private Seq<Object> brokerIds;
private Set<TopicPartition> partitions;
@Setup(Level.Invocation)
public void setup() throws IOException {
ControllerContext controllerContext = new ControllerContext();
for (int i = 0; i < NUM_BROKERS; i++) {
int brokerId = i;
controllerContext.addLiveBrokers(JavaConverters.asScala(new HashMap<Broker, Object>() {{
put(new Broker(brokerId, "localhost", 0, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT), (Object) 1L);
}}));
}
for (int i = 0; i < NUM_PARTITIONS; i++) {
int leader = i % NUM_BROKERS;
int f1 = (leader + 1) % NUM_BROKERS;
int f2 = (leader + 2) % NUM_BROKERS;
controllerContext.updatePartitionFullReplicaAssignment(
new TopicPartition("topic", i),
ReplicaAssignment.apply(JavaConverters.asScala(
Arrays.asList(leader, f1, (Object) f2)
).toSeq())
);
controllerContext.putPartitionLeadershipInfo(
new TopicPartition("topic", i),
new LeaderIsrAndControllerEpoch(new LeaderAndIsr(
leader, 1, LeaderRecoveryState.RECOVERED,
JavaConverters.asScala(Arrays.asList(
new BrokerState().setBrokerId(leader),
new BrokerState().setBrokerId(f1),
new BrokerState().setBrokerId(f2)
)).toList(), 1), 1));
}
batch = new Batch(controllerContext);
batch.newBatch();
List<Object> brokerIds = new ArrayList<>();
for (int i = 0; i < NUM_BROKERS; i++) {
brokerIds.add(i);
}
this.brokerIds = JavaConverters.asScala(brokerIds).toSeq();
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < NUM_PARTITIONS; i++) {
partitions.add(new TopicPartition("topic", i));
}
this.partitions = JavaConverters.asScala(partitions).toSet();
}
@TearDown(Level.Invocation)
public void tearDown() throws IOException, InterruptedException {
}
@Benchmark
public void testAddUpdateMetadataRequestForBrokers() {
batch.addUpdateMetadataRequestForBrokers(brokerIds, partitions);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment