Created
March 27, 2012 02:52
-
-
Save jyemin/2212066 to your computer and use it in GitHub Desktop.
Program for benchmarking upserts using the Java driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.*; | |
import org.bson.types.ObjectId; | |
import java.lang.management.ManagementFactory; | |
import java.lang.management.ThreadInfo; | |
import java.lang.management.ThreadMXBean; | |
import java.net.UnknownHostException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
/** | |
* Test for reading from a large connection pool | |
*/ | |
public class MongoUpsertContentionTest { | |
public static void testPoolsOfDifferentSizeWithDifferentThreadCounts() throws UnknownHostException, InterruptedException { | |
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); | |
threadMXBean.setThreadContentionMonitoringEnabled(true); | |
threadMXBean.setThreadCpuTimeEnabled(true); | |
int numThreads = 1; | |
int poolSize = 20; | |
int numJobs = numThreads; | |
int numUpsertsPerJob = 200000; | |
MongoOptions options = new MongoOptions(); | |
options.connectionsPerHost = poolSize; | |
Mongo mongo = new Mongo(Arrays.asList(new ServerAddress("127.0.0.1", 27017)), options); | |
mongo.getDB("largeConnectionPoolDB").dropDatabase(); | |
System.out.println("Warming up..."); | |
executeParallelOperation(mongo, poolSize, poolSize, 10, true, WriteConcern.SAFE, false); | |
System.out.println(); | |
System.out.println("num threads: " + numThreads); | |
System.out.println("pool size: " + poolSize); | |
System.out.println("jobs: " + numJobs); | |
System.out.println("num upserts per job: " + numUpsertsPerJob); | |
System.out.println(); | |
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true); | |
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true); | |
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true); | |
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true); | |
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true); | |
mongo.getDB("largeConnectionPoolDB").dropDatabase(); | |
mongo.close(); | |
} | |
private static void executeParallelOperation(Mongo mongo, int numThreads, int numJobs, int numUpserts, | |
boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws UnknownHostException, InterruptedException { | |
DBCollection coll = mongo.getDB("largeConnectionPoolDB").getCollection("largeConnectionPoolDB"); | |
coll.drop(); | |
doOne(numThreads, numJobs, numUpserts, coll, reserveConnection, writeConcern, printResults); | |
} | |
private static void doOne(int numThreads, int numJobs, final int numUpserts, final DBCollection coll, final boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws InterruptedException { | |
long start = System.currentTimeMillis(); | |
ExecutorService es = Executors.newFixedThreadPool(numThreads); | |
List<ContentionJob> jobs = new ArrayList<ContentionJob>(); | |
for (int x = 0; x < numJobs; x++) { | |
ContentionJob job = new ContentionJob(coll, reserveConnection, numUpserts, writeConcern); | |
jobs.add(job); | |
es.submit(job); | |
} | |
for (ContentionJob job : jobs) { | |
job.awaitCompletion(); | |
} | |
if (printResults) { | |
long end = System.currentTimeMillis(); | |
System.out.println("reserve: " + reserveConnection + ", " + (reserveConnection ? " " : "") + "time elapsed: " + (end - start) + " ms"); | |
printPoolThreadInfo(); | |
System.out.println(); | |
} | |
es.shutdown(); | |
es.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); | |
} | |
public static void printPoolThreadInfo() throws InterruptedException { | |
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); | |
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); | |
for (ThreadInfo cur : threadInfos) { | |
if (!cur.getThreadName().startsWith("pool")) { | |
continue; | |
} | |
// System.out.print(cur.getThreadName() + ": "); | |
System.out.print("blocked count: " + cur.getBlockedCount()); | |
System.out.print(", blocked time: " + cur.getBlockedTime() + " ms"); | |
System.out.print(", cpu time: " + (threadMXBean.getThreadCpuTime(cur.getThreadId()) / 1000000) + " ms"); | |
System.out.print(", user time: " + (threadMXBean.getThreadUserTime(cur.getThreadId()) / 1000000) + " ms"); | |
System.out.println(); | |
} | |
} | |
public static void main(String args[]) throws UnknownHostException, InterruptedException { | |
testPoolsOfDifferentSizeWithDifferentThreadCounts(); | |
} | |
static class ContentionJob implements Runnable { | |
protected final DBCollection coll; | |
private final boolean reserveConnection; | |
private final int num; | |
private final WriteConcern writeConcern; | |
private volatile boolean done; | |
ContentionJob(DBCollection coll, boolean reserveConnection, int num, WriteConcern writeConcern) { | |
this.coll = coll; | |
this.reserveConnection = reserveConnection; | |
this.num = num; | |
this.writeConcern = writeConcern; | |
} | |
public void run() { | |
if (reserveConnection) | |
coll.getDB().requestStart(); | |
try { | |
DBObject obj = new BasicDBObject(); | |
obj.put("_id", new ObjectId()); | |
DBObject upsert = new BasicDBObject("$inc", new BasicDBObject("x", 1)); | |
for (int i = 0; i < num; i++) { | |
coll.update(obj, upsert, true, false, writeConcern); | |
} | |
} catch (RuntimeException e) { | |
Logger.getAnonymousLogger().log(Level.INFO, "Exception", e); | |
} finally { | |
if (reserveConnection) | |
coll.getDB().requestDone(); | |
} | |
synchronized (this) { | |
done = true; | |
notify(); | |
} | |
} | |
public synchronized void awaitCompletion() throws InterruptedException { | |
while (!done) { | |
wait(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment