Last active
October 30, 2018 21:54
-
-
Save Bill/e7af3016cceaad63205f7bc733ab5d72 to your computer and use it in GitHub Desktop.
This benchmark works against the latest Apache Geode develop branch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license | |
* agreements. See the NOTICE file distributed with this work for additional information regarding | |
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance with the License. You may obtain a | |
* copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software distributed under the License | |
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | |
* or implied. See the License for the specific language governing permissions and limitations under | |
* the License. | |
*/ | |
package org.apache.geode.cache.query.internal; | |
import static org.mockito.Mockito.mock; | |
import static org.mockito.Mockito.when; | |
import java.util.Random; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import org.openjdk.jmh.annotations.Benchmark; | |
import org.openjdk.jmh.annotations.BenchmarkMode; | |
import org.openjdk.jmh.annotations.Fork; | |
import org.openjdk.jmh.annotations.Level; | |
import org.openjdk.jmh.annotations.Measurement; | |
import org.openjdk.jmh.annotations.Mode; | |
import org.openjdk.jmh.annotations.OutputTimeUnit; | |
import org.openjdk.jmh.annotations.Scope; | |
import org.openjdk.jmh.annotations.Setup; | |
import org.openjdk.jmh.annotations.State; | |
import org.openjdk.jmh.annotations.TearDown; | |
import org.apache.geode.internal.cache.InternalCache; | |
@State(Scope.Thread) | |
@Fork(1) | |
public class MonitorQueryUnderContentionBenchmark { | |
/* | |
* all times in milliseconds | |
*/ | |
private static final long QueryMaxExecutionTime = 6; | |
/* | |
* Delay, from time startOneSimulatedQuery() is called, until monitorQueryThread() is called. | |
*/ | |
public static final int QueryInitialDelay = 0; | |
/* | |
* The mode is the center of the "hump" of the Gaussian distribution. | |
* | |
* We usually want to arrange the two humps equidistant from QueryMaxExecutionTime. | |
*/ | |
private static final int FastQueryCompletionMode = 1; | |
private static final int SlowQueryCompletionMode = 1000000; | |
/* | |
* How often should we start a query of each type? | |
* | |
* Starting them more frequently leads to heavier load. | |
* | |
* They're separated so we can play with different mixes. | |
*/ | |
private static final int StartFastQueryPeriod = 1; | |
private static final int StartSlowQueryPeriod = 1; | |
/* | |
* After load is established, how many measurements shall we take? | |
*/ | |
// private static final double BenchmarkIterations = 1e4; | |
private static final double BenchmarkIterations = 1e2; | |
// public static final int TimeToQuiesceBeforeSampling = 240000; | |
public static final int TimeToQuiesceBeforeSampling = 180000; | |
public static final int ThreadPoolProcessorMultiple = 2; | |
public static final int RandomSeed = 151; | |
private QueryMonitor monitor; | |
private Thread thread; | |
private DefaultQuery query; | |
private InternalCache cache; | |
private Random random; | |
private ScheduledThreadPoolExecutor executorService; | |
public static final boolean[] TrueArray = new boolean[] {true}; | |
private Thread queryMonitorThread; | |
@Setup(Level.Trial) | |
public void trialSetup() throws InterruptedException { | |
cache = mock(InternalCache.class); | |
monitor = | |
// new QueryMonitor(() -> (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1), | |
// cache, QueryMaxExecutionTime); | |
new QueryMonitor(cache, QueryMaxExecutionTime); | |
queryMonitorThread = new Thread(monitor); | |
queryMonitorThread.start(); | |
this.thread = mock(Thread.class); | |
final int numberOfThreads = | |
ThreadPoolProcessorMultiple * Runtime.getRuntime().availableProcessors(); | |
executorService = | |
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( | |
numberOfThreads); | |
System.out.println(String.format("Pool has %d threads", numberOfThreads)); | |
executorService.setRemoveOnCancelPolicy(true); | |
random = new Random(RandomSeed); | |
query = createDefaultQuery(); | |
generateLoad(executorService, () -> startOneFastQuery(executorService), | |
StartFastQueryPeriod); | |
generateLoad(executorService, () -> startOneSlowQuery(executorService), | |
StartSlowQueryPeriod); | |
// allow system to quiesce | |
Thread.sleep(TimeToQuiesceBeforeSampling); | |
System.out.println("Queries in flight prior to test: " + executorService.getQueue().size()); | |
} | |
@TearDown(Level.Trial) | |
public void trialTeardown() { | |
executorService.shutdownNow(); | |
monitor.stopMonitoring(); | |
queryMonitorThread.interrupt(); | |
} | |
@Benchmark | |
@Measurement(iterations = (int) BenchmarkIterations) | |
@BenchmarkMode(Mode.SingleShotTime) | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
// @Warmup we don't warm up because our @Setup warms us up | |
public void monitorQuery() { | |
// monitor.monitorQueryThread(query); | |
// monitor.stopMonitoringQueryThread(query); | |
final Thread queryThread = Thread.currentThread(); | |
monitor.monitorQueryThread(queryThread, query); | |
monitor.stopMonitoringQueryThread(queryThread, query); | |
} | |
private ScheduledFuture<?> generateLoad(final ScheduledExecutorService executorService, | |
final Runnable queryStarter, int startPeriod) { | |
return executorService.scheduleAtFixedRate(() -> { | |
queryStarter.run(); | |
}, | |
QueryInitialDelay, | |
startPeriod, | |
TimeUnit.MILLISECONDS); | |
} | |
private void startOneFastQuery(ScheduledExecutorService executorService) { | |
startOneSimulatedQuery(executorService, 100, FastQueryCompletionMode); | |
} | |
private void startOneSlowQuery(ScheduledExecutorService executorService) { | |
startOneSimulatedQuery(executorService, 100, SlowQueryCompletionMode); | |
} | |
private void startOneSimulatedQuery(ScheduledExecutorService executorService, | |
int startDelayRangeMillis, int completeDelayRangeMillis) { | |
executorService.schedule(() -> { | |
final Thread thread = mock(Thread.class); | |
final DefaultQuery query = createDefaultQuery(); | |
// monitor.monitorQueryThread(query); | |
monitor.monitorQueryThread(thread, query); | |
executorService.schedule(() -> { | |
// monitor.stopMonitoringQueryThread(query); | |
monitor.stopMonitoringQueryThread(thread, query); | |
}, | |
gaussianLong(completeDelayRangeMillis), | |
TimeUnit.MILLISECONDS); | |
}, | |
gaussianLong(startDelayRangeMillis), | |
TimeUnit.MILLISECONDS); | |
} | |
private long gaussianLong(int range) { | |
return (long) (random.nextGaussian() * range); | |
} | |
private DefaultQuery createDefaultQuery() { | |
// we can reuse this because it doesn't affect lookup or equality in the collection(s) | |
final DefaultQuery defaultQuery = mock(DefaultQuery.class); | |
when(defaultQuery.getQueryCompletedForMonitoring()).thenReturn(TrueArray); | |
return defaultQuery; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment