Skip to content

Instantly share code, notes, and snippets.

@ksauzz
Created March 21, 2013 02:48
Show Gist options
  • Select an option

  • Save ksauzz/5210331 to your computer and use it in GitHub Desktop.

Select an option

Save ksauzz/5210331 to your computer and use it in GitHub Desktop.
riak java client sample
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import com.basho.riak.client.cap.DefaultRetrier;
import com.basho.riak.client.raw.pbc.PBClientConfig;
import com.basho.riak.client.raw.pbc.PBClusterConfig;
public class ClusterConfSample {
private static final int DATA_COUNT = 4 * 10000;
private static final int THREAD_POOL_SIZE = 50;
private static final int MAX_JOB_QUEUE_CAPACITY = 50;
private static final int TOTAL_MAX_CONNECTIONS = 0; // unlimited
private static final int MAX_CONNECTION_SIZE = 50;
private static final int INIT_CONNECTION_SIZE = 50;
private static final int BUFFER_KB = 1;
private static final int IDLE_CONN_TIMEOUT_MIL = 2000;
private static final int CONNECTION_TIMEOUT_MIL = 2000;
private static final int REQUEST_TIMEOUT_MIL = 2000;
public static void main(String[] args) throws Exception {
// PBClientConfig node1 = PBClientConfig.defaults();
PBClientConfig node1 = new PBClientConfig.Builder()
.withHost("127.0.0.1")
.withPort(8087)
.withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL)
.withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL)
.withSocketBufferSizeKb(BUFFER_KB)
.withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL)
.withInitialPoolSize(INIT_CONNECTION_SIZE)
.withPoolSize(MAX_CONNECTION_SIZE)
.build();
PBClientConfig node2 = PBClientConfig.Builder.from(node1)
.withHost("127.0.0.1")
.withPort(8087)
.build();
PBClusterConfig clusterConf = new PBClusterConfig(TOTAL_MAX_CONNECTIONS);
clusterConf.addClient(node1);
clusterConf.addClient(node2);
IRiakClient client = RiakFactory.newClient(clusterConf);
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(THREAD_POOL_SIZE);
long startTime = System.currentTimeMillis();
Bucket bucket = client.createBucket("demo_bucket").execute();
for (int i = 0; i < DATA_COUNT; i++) {
waitForQueueSizeLessThan(executor, MAX_JOB_QUEUE_CAPACITY);
executor.execute(newStoringTask(bucket, i));
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
client.shutdown();
log("ops per sec: " + DATA_COUNT / (duration / 1000.0));
}
private static void waitForQueueSizeLessThan(ThreadPoolExecutor executor, int size)
throws InterruptedException {
while (executor.getQueue().size() > size ) { Thread.sleep(10L); }
}
private static Runnable newStoringTask(final Bucket bucket, final int counter) {
return new Runnable() {
@Override
public void run() {
try {
if (counter % 1000 == 0) { log("storing key #" + counter); };
String key = "demo_key_" + counter;
String value = "{'name':'bob','age':18}";
bucket.store(key, value)
.returnBody(false)
.withRetrier(DefaultRetrier.attempts(0))
.execute();
} catch (Exception e) {
log(e.toString());
}
}
};
}
private static void log(String log) {
System.out.println(log);
}
}
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.builders.RiakObjectBuilder;
import com.basho.riak.client.raw.RawClient;
import com.basho.riak.client.raw.config.ClusterConfig;
import com.basho.riak.client.raw.pbc.PBClientConfig;
import com.basho.riak.client.raw.pbc.PBClusterClientFactory;
import com.basho.riak.client.raw.pbc.PBClusterConfig;
public class ClusterConfSampleWithRawClient {
private static final int DATA_COUNT = 4 * 10000;
private static final int THREAD_POOL_SIZE = 50;
private static final int MAX_JOB_QUEUE_CAPACITY = 50;
private static final int TOTAL_MAX_CONNECTIONS = 0; // unlimited.
private static final int MAX_CONNECTION_SIZE = 50;
private static final int INIT_CONNECTION_SIZE = 50;
private static final int BUFFER_KB = 1;
private static final int IDLE_CONN_TIMEOUT_MIL = 2000;
private static final int CONNECTION_TIMEOUT_MIL = 2000;
private static final int REQUEST_TIMEOUT_MIL = 2000;
public static void main(String[] args) throws Exception {
// PBClientConfig node1 = PBClientConfig.defaults();
PBClientConfig node1 = new PBClientConfig.Builder()
.withHost("127.0.0.1")
.withPort(8087)
.withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL)
.withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL)
.withSocketBufferSizeKb(BUFFER_KB)
.withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL)
.withInitialPoolSize(INIT_CONNECTION_SIZE)
.withPoolSize(MAX_CONNECTION_SIZE)
.build();
PBClientConfig node2 = PBClientConfig.Builder.from(node1)
.withHost("127.0.0.1")
.withPort(8087)
.build();
ClusterConfig<PBClientConfig> clusterConf = new PBClusterConfig(TOTAL_MAX_CONNECTIONS)
.addClient(node1)
.addClient(node2);
RawClient client = PBClusterClientFactory.getInstance().newClient(clusterConf);
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(THREAD_POOL_SIZE);
long startTime = System.currentTimeMillis();
for (int i = 0; i < DATA_COUNT; i++) {
waitForQueueSizeLessThan(executor, MAX_JOB_QUEUE_CAPACITY);
executor.execute(newStoringTask(client, i));
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
client.shutdown();
log("ops per sec: " + DATA_COUNT / (duration / 1000.0));
}
private static void waitForQueueSizeLessThan(ThreadPoolExecutor executor, int size)
throws InterruptedException {
while (executor.getQueue().size() > size ) { Thread.sleep(10L); }
}
private static Runnable newStoringTask(final RawClient client, final int counter) {
return new Runnable() {
@Override
public void run() {
try {
if (counter % 1000 == 0) { log("storing key #" + counter); };
String key = "demo_key_" + counter;
String value = "{'name':'bob','age':18}";
IRiakObject object = RiakObjectBuilder.newBuilder("demo_bucket", key)
.withContentType("application/json")
.withValue(value)
.build();
client.store(object);
} catch (Exception e) {
log(e.toString());
}
}
};
}
private static void log(String log) {
System.out.println(log);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment