$ ./jmh-benchmarks/jmh.sh -prof "async:libPath=/path/to/libasyncProfiler.so;output=flamegraph" ControllerBrokerRequestBatchBenchmark
Created
July 11, 2024 14:45
-
-
Save ocadaruma/e80be044227d6235126310e9058f546d to your computer and use it in GitHub Desktop.
KAFKA-17061 benchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Place this file under jmh-benchmarks/src/main/java/org/apache/kafka/jmh/controller | |
package org.apache.kafka.jmh.controller; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState; | |
import org.apache.kafka.common.network.ListenerName; | |
import org.apache.kafka.common.protocol.Errors; | |
import org.apache.kafka.common.requests.AbstractControlRequest; | |
import org.apache.kafka.common.requests.AbstractControlRequest.Builder; | |
import org.apache.kafka.common.requests.AbstractResponse; | |
import org.apache.kafka.common.requests.LeaderAndIsrResponse; | |
import org.apache.kafka.common.requests.StopReplicaResponse; | |
import org.apache.kafka.common.requests.UpdateMetadataResponse; | |
import org.apache.kafka.common.security.auth.SecurityProtocol; | |
import org.apache.kafka.metadata.LeaderRecoveryState; | |
import org.apache.kafka.server.common.MetadataVersion; | |
import org.openjdk.jmh.annotations.Benchmark; | |
import org.openjdk.jmh.annotations.BenchmarkMode; | |
import org.openjdk.jmh.annotations.Fork; | |
import org.openjdk.jmh.annotations.Level; | |
import org.openjdk.jmh.annotations.Measurement; | |
import org.openjdk.jmh.annotations.Mode; | |
import org.openjdk.jmh.annotations.OutputTimeUnit; | |
import org.openjdk.jmh.annotations.Scope; | |
import org.openjdk.jmh.annotations.Setup; | |
import org.openjdk.jmh.annotations.State; | |
import org.openjdk.jmh.annotations.TearDown; | |
import org.openjdk.jmh.annotations.Warmup; | |
import kafka.api.LeaderAndIsr; | |
import kafka.cluster.Broker; | |
import kafka.controller.AbstractControllerBrokerRequestBatch; | |
import kafka.controller.ControllerContext; | |
import kafka.controller.LeaderIsrAndControllerEpoch; | |
import kafka.controller.ReplicaAssignment; | |
import kafka.controller.StateChangeLogger; | |
import kafka.server.KafkaConfig; | |
import kafka.utils.TestUtils; | |
import scala.Function1; | |
import scala.collection.JavaConverters; | |
import scala.collection.Seq; | |
import scala.collection.Set; | |
import scala.collection.immutable.Map; | |
import scala.runtime.BoxedUnit; | |
@State(Scope.Benchmark) | |
@Fork(value = 1) | |
@Warmup(iterations = 5) | |
@Measurement(iterations = 15) | |
@BenchmarkMode(Mode.AverageTime) | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
public class ControllerBrokerRequestBatchBenchmark { | |
private static final int RANDOM_PORT = 0; | |
private static final int NUM_BROKERS = 200; | |
private static final int NUM_PARTITIONS = 40000; | |
private static class Batch extends AbstractControllerBrokerRequestBatch { | |
private Batch(ControllerContext controllerContext) { | |
super(KafkaConfig.fromProps(TestUtils.createBrokerConfig( | |
1, "zkConnect", true, true, RANDOM_PORT, | |
scala.Option.empty(), | |
scala.Option.empty(), | |
scala.Option.empty(), | |
true, false, RANDOM_PORT, false, RANDOM_PORT, false, RANDOM_PORT, | |
scala.Option.empty(), | |
1, false, 1, (short) 1, false)), | |
() -> controllerContext, | |
() -> MetadataVersion.LATEST_PRODUCTION, | |
new StateChangeLogger(1, true, scala.Option.empty()), | |
false); | |
} | |
@Override | |
public void sendRequest(int brokerId, Builder<? extends AbstractControlRequest> request, | |
Function1<AbstractResponse, BoxedUnit> callback) { | |
} | |
@Override | |
public void handleLeaderAndIsrResponse(LeaderAndIsrResponse response, int broker) { | |
} | |
@Override | |
public void handleUpdateMetadataResponse(UpdateMetadataResponse response, int broker) { | |
} | |
@Override | |
public void handleStopReplicaResponse(StopReplicaResponse stopReplicaResponse, int brokerId, | |
Map<TopicPartition, Errors> partitionErrorsForDeletingTopics) { | |
} | |
} | |
private Batch batch; | |
private Seq<Object> brokerIds; | |
private Set<TopicPartition> partitions; | |
@Setup(Level.Invocation) | |
public void setup() throws IOException { | |
ControllerContext controllerContext = new ControllerContext(); | |
for (int i = 0; i < NUM_BROKERS; i++) { | |
int brokerId = i; | |
controllerContext.addLiveBrokers(JavaConverters.asScala(new HashMap<Broker, Object>() {{ | |
put(new Broker(brokerId, "localhost", 0, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT), (Object) 1L); | |
}})); | |
} | |
for (int i = 0; i < NUM_PARTITIONS; i++) { | |
int leader = i % NUM_BROKERS; | |
int f1 = (leader + 1) % NUM_BROKERS; | |
int f2 = (leader + 2) % NUM_BROKERS; | |
controllerContext.updatePartitionFullReplicaAssignment( | |
new TopicPartition("topic", i), | |
ReplicaAssignment.apply(JavaConverters.asScala( | |
Arrays.asList(leader, f1, (Object) f2) | |
).toSeq()) | |
); | |
controllerContext.putPartitionLeadershipInfo( | |
new TopicPartition("topic", i), | |
new LeaderIsrAndControllerEpoch(new LeaderAndIsr( | |
leader, 1, LeaderRecoveryState.RECOVERED, | |
JavaConverters.asScala(Arrays.asList( | |
new BrokerState().setBrokerId(leader), | |
new BrokerState().setBrokerId(f1), | |
new BrokerState().setBrokerId(f2) | |
)).toList(), 1), 1)); | |
} | |
batch = new Batch(controllerContext); | |
batch.newBatch(); | |
List<Object> brokerIds = new ArrayList<>(); | |
for (int i = 0; i < NUM_BROKERS; i++) { | |
brokerIds.add(i); | |
} | |
this.brokerIds = JavaConverters.asScala(brokerIds).toSeq(); | |
List<TopicPartition> partitions = new ArrayList<>(); | |
for (int i = 0; i < NUM_PARTITIONS; i++) { | |
partitions.add(new TopicPartition("topic", i)); | |
} | |
this.partitions = JavaConverters.asScala(partitions).toSet(); | |
} | |
@TearDown(Level.Invocation) | |
public void tearDown() throws IOException, InterruptedException { | |
} | |
@Benchmark | |
public void testAddUpdateMetadataRequestForBrokers() { | |
batch.addUpdateMetadataRequestForBrokers(brokerIds, partitions); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment