Created
March 21, 2013 02:48
-
-
Save ksauzz/5210331 to your computer and use it in GitHub Desktop.
riak java client sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.ThreadPoolExecutor; | |
| import java.util.concurrent.TimeUnit; | |
| import com.basho.riak.client.IRiakClient; | |
| import com.basho.riak.client.RiakFactory; | |
| import com.basho.riak.client.bucket.Bucket; | |
| import com.basho.riak.client.cap.DefaultRetrier; | |
| import com.basho.riak.client.raw.pbc.PBClientConfig; | |
| import com.basho.riak.client.raw.pbc.PBClusterConfig; | |
| public class ClusterConfSample { | |
| private static final int DATA_COUNT = 4 * 10000; | |
| private static final int THREAD_POOL_SIZE = 50; | |
| private static final int MAX_JOB_QUEUE_CAPACITY = 50; | |
| private static final int TOTAL_MAX_CONNECTIONS = 0; // unlimited | |
| private static final int MAX_CONNECTION_SIZE = 50; | |
| private static final int INIT_CONNECTION_SIZE = 50; | |
| private static final int BUFFER_KB = 1; | |
| private static final int IDLE_CONN_TIMEOUT_MIL = 2000; | |
| private static final int CONNECTION_TIMEOUT_MIL = 2000; | |
| private static final int REQUEST_TIMEOUT_MIL = 2000; | |
| public static void main(String[] args) throws Exception { | |
| // PBClientConfig node1 = PBClientConfig.defaults(); | |
| PBClientConfig node1 = new PBClientConfig.Builder() | |
| .withHost("127.0.0.1") | |
| .withPort(8087) | |
| .withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL) | |
| .withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL) | |
| .withSocketBufferSizeKb(BUFFER_KB) | |
| .withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL) | |
| .withInitialPoolSize(INIT_CONNECTION_SIZE) | |
| .withPoolSize(MAX_CONNECTION_SIZE) | |
| .build(); | |
| PBClientConfig node2 = PBClientConfig.Builder.from(node1) | |
| .withHost("127.0.0.1") | |
| .withPort(8087) | |
| .build(); | |
| PBClusterConfig clusterConf = new PBClusterConfig(TOTAL_MAX_CONNECTIONS); | |
| clusterConf.addClient(node1); | |
| clusterConf.addClient(node2); | |
| IRiakClient client = RiakFactory.newClient(clusterConf); | |
| ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(THREAD_POOL_SIZE); | |
| long startTime = System.currentTimeMillis(); | |
| Bucket bucket = client.createBucket("demo_bucket").execute(); | |
| for (int i = 0; i < DATA_COUNT; i++) { | |
| waitForQueueSizeLessThan(executor, MAX_JOB_QUEUE_CAPACITY); | |
| executor.execute(newStoringTask(bucket, i)); | |
| } | |
| executor.shutdown(); | |
| executor.awaitTermination(5, TimeUnit.SECONDS); | |
| long duration = System.currentTimeMillis() - startTime; | |
| client.shutdown(); | |
| log("ops per sec: " + DATA_COUNT / (duration / 1000.0)); | |
| } | |
| private static void waitForQueueSizeLessThan(ThreadPoolExecutor executor, int size) | |
| throws InterruptedException { | |
| while (executor.getQueue().size() > size ) { Thread.sleep(10L); } | |
| } | |
| private static Runnable newStoringTask(final Bucket bucket, final int counter) { | |
| return new Runnable() { | |
| @Override | |
| public void run() { | |
| try { | |
| if (counter % 1000 == 0) { log("storing key #" + counter); }; | |
| String key = "demo_key_" + counter; | |
| String value = "{'name':'bob','age':18}"; | |
| bucket.store(key, value) | |
| .returnBody(false) | |
| .withRetrier(DefaultRetrier.attempts(0)) | |
| .execute(); | |
| } catch (Exception e) { | |
| log(e.toString()); | |
| } | |
| } | |
| }; | |
| } | |
| private static void log(String log) { | |
| System.out.println(log); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.ThreadPoolExecutor; | |
| import java.util.concurrent.TimeUnit; | |
| import com.basho.riak.client.IRiakObject; | |
| import com.basho.riak.client.builders.RiakObjectBuilder; | |
| import com.basho.riak.client.raw.RawClient; | |
| import com.basho.riak.client.raw.config.ClusterConfig; | |
| import com.basho.riak.client.raw.pbc.PBClientConfig; | |
| import com.basho.riak.client.raw.pbc.PBClusterClientFactory; | |
| import com.basho.riak.client.raw.pbc.PBClusterConfig; | |
| public class ClusterConfSampleWithRawClient { | |
| private static final int DATA_COUNT = 4 * 10000; | |
| private static final int THREAD_POOL_SIZE = 50; | |
| private static final int MAX_JOB_QUEUE_CAPACITY = 50; | |
| private static final int TOTAL_MAX_CONNECTIONS = 0; // unlimited. | |
| private static final int MAX_CONNECTION_SIZE = 50; | |
| private static final int INIT_CONNECTION_SIZE = 50; | |
| private static final int BUFFER_KB = 1; | |
| private static final int IDLE_CONN_TIMEOUT_MIL = 2000; | |
| private static final int CONNECTION_TIMEOUT_MIL = 2000; | |
| private static final int REQUEST_TIMEOUT_MIL = 2000; | |
| public static void main(String[] args) throws Exception { | |
| // PBClientConfig node1 = PBClientConfig.defaults(); | |
| PBClientConfig node1 = new PBClientConfig.Builder() | |
| .withHost("127.0.0.1") | |
| .withPort(8087) | |
| .withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL) | |
| .withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL) | |
| .withSocketBufferSizeKb(BUFFER_KB) | |
| .withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL) | |
| .withInitialPoolSize(INIT_CONNECTION_SIZE) | |
| .withPoolSize(MAX_CONNECTION_SIZE) | |
| .build(); | |
| PBClientConfig node2 = PBClientConfig.Builder.from(node1) | |
| .withHost("127.0.0.1") | |
| .withPort(8087) | |
| .build(); | |
| ClusterConfig<PBClientConfig> clusterConf = new PBClusterConfig(TOTAL_MAX_CONNECTIONS) | |
| .addClient(node1) | |
| .addClient(node2); | |
| RawClient client = PBClusterClientFactory.getInstance().newClient(clusterConf); | |
| ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(THREAD_POOL_SIZE); | |
| long startTime = System.currentTimeMillis(); | |
| for (int i = 0; i < DATA_COUNT; i++) { | |
| waitForQueueSizeLessThan(executor, MAX_JOB_QUEUE_CAPACITY); | |
| executor.execute(newStoringTask(client, i)); | |
| } | |
| executor.shutdown(); | |
| executor.awaitTermination(5, TimeUnit.SECONDS); | |
| long duration = System.currentTimeMillis() - startTime; | |
| client.shutdown(); | |
| log("ops per sec: " + DATA_COUNT / (duration / 1000.0)); | |
| } | |
| private static void waitForQueueSizeLessThan(ThreadPoolExecutor executor, int size) | |
| throws InterruptedException { | |
| while (executor.getQueue().size() > size ) { Thread.sleep(10L); } | |
| } | |
| private static Runnable newStoringTask(final RawClient client, final int counter) { | |
| return new Runnable() { | |
| @Override | |
| public void run() { | |
| try { | |
| if (counter % 1000 == 0) { log("storing key #" + counter); }; | |
| String key = "demo_key_" + counter; | |
| String value = "{'name':'bob','age':18}"; | |
| IRiakObject object = RiakObjectBuilder.newBuilder("demo_bucket", key) | |
| .withContentType("application/json") | |
| .withValue(value) | |
| .build(); | |
| client.store(object); | |
| } catch (Exception e) { | |
| log(e.toString()); | |
| } | |
| } | |
| }; | |
| } | |
| private static void log(String log) { | |
| System.out.println(log); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment