Skip to content

Instantly share code, notes, and snippets.

@adutra
Created June 22, 2019 16:12
Show Gist options
  • Save adutra/853609fead6938ff87db2b7902ecc55c to your computer and use it in GitHub Desktop.
Save adutra/853609fead6938ff87db2b7902ecc55c to your computer and use it in GitHub Desktop.
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.shaded.guava.common.base.Strings;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
public class PagingIterableSpliteratorBenchmark {
@State(Scope.Benchmark)
public static class PagingIterableState {
@Param({"32", "128", "256", "512", "1024"})
public int batchSize;
private CqlSession session;
@Setup
public void setUp() throws IOException {
session =
CqlSession.builder()
.addContactPoint(InetSocketAddress.createUnresolved("127.0.1.1", 9042))
.withLocalDatacenter("Cassandra")
.withKeyspace("test")
.build();
DriverExecutionProfile slowProfile =
session
.getContext()
.getConfig()
.getDefaultProfile()
.withString(DefaultDriverOption.REQUEST_TIMEOUT, "30s");
session.execute(
SimpleStatement.builder(
"CREATE TABLE IF NOT EXISTS test (k0 int, k1 int, v text, PRIMARY KEY(k0, k1))")
.setExecutionProfile(slowProfile)
.build());
PreparedStatement prepared = session.prepare("INSERT INTO test (k0, k1, v) VALUES (?, ?, ?)");
for (int i = 0; i < 20_000; i += 100) {
BatchStatementBuilder batch = BatchStatement.builder(DefaultBatchType.UNLOGGED);
for (int j = 0; j < 100; j++) {
int n = i + j;
String text = String.valueOf(System.nanoTime());
batch.addStatement(prepared.bind(0, n, Strings.repeat(text, 15)));
}
session.execute(batch.setExecutionProfile(slowProfile).build());
}
}
Stream<String> pagingIterable(int batchSize) {
ResultSet rs = session.execute("SELECT v FROM test WHERE k0 = 0");
return StreamSupport.stream(
PagingIterableSpliterator.builder(rs).withChunkSize(batchSize).build(), true)
.map(row -> row.getString(0));
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Measurement(iterations = 3)
@Warmup(iterations = 1)
@Fork(1)
public void benchmarkPagingIterable(PagingIterableState state, Blackhole sink)
throws IOException {
try (Stream<String> lines = state.pagingIterable(state.batchSize)) {
sink.consume(
lines
.parallel()
.mapToLong(
line -> {
double d = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
d += Math.pow(line.charAt(i), line.charAt(j) / 32.0);
sink.consume(d);
return 1;
})
.sum());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment