Last active
August 29, 2015 14:04
-
-
Save mpenick/eb59939b5020e081fb78 to your computer and use it in GitHub Desktop.
Prepared Inserts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Copyright (c) 2014 DataStax | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
*/ | |
#include <assert.h> | |
#include <string.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <uv.h> | |
#include "cassandra.h" | |
#define NUM_THREADS 1 | |
#define NUM_CONCURRENT_REQUESTS 10000 | |
void print_error(CassFuture* future) { | |
CassString message = cass_future_error_message(future); | |
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data); | |
} | |
CassCluster* create_cluster() { | |
CassCluster* cluster = cass_cluster_new(); | |
cass_cluster_set_contact_points(cluster, "127.0.0.1"); | |
cass_cluster_set_credentials(cluster, "cassandra", "cassandra"); | |
cass_cluster_set_log_level(cluster, CASS_LOG_WARN); | |
cass_cluster_set_queue_size_io(cluster, 8*16384); | |
cass_cluster_set_num_threads_io(cluster, 2); | |
cass_cluster_set_max_pending_requests(cluster, 100000); | |
cass_cluster_set_core_connections_per_host(cluster, 4); | |
cass_cluster_set_max_connections_per_host(cluster, 10); | |
return cluster; | |
} | |
CassError connect_session(CassCluster* cluster, CassSession** output) { | |
CassError rc = 0; | |
CassFuture* future = cass_cluster_connect_keyspace(cluster, "examples"); | |
*output = NULL; | |
cass_future_wait(future); | |
rc = cass_future_error_code(future); | |
if(rc != CASS_OK) { | |
print_error(future); | |
} else { | |
*output = cass_future_get_session(future); | |
} | |
cass_future_free(future); | |
return rc; | |
} | |
CassError prepare_query(CassSession* session, CassString query, const CassPrepared** prepared) { | |
CassError rc = 0; | |
CassFuture* future = NULL; | |
future = cass_session_prepare(session, query); | |
cass_future_wait(future); | |
rc = cass_future_error_code(future); | |
if(rc != CASS_OK) { | |
print_error(future); | |
} else { | |
*prepared = cass_future_get_prepared(future); | |
} | |
cass_future_free(future); | |
return rc; | |
} | |
void insert_into_perf(CassSession* session, CassString query, const CassPrepared* prepared) { | |
size_t i; | |
uint64_t start, elapsed; | |
CassFuture* futures[NUM_CONCURRENT_REQUESTS]; | |
static double total = 0.0; | |
static long count = 0; | |
CassCollection* collection = cass_collection_new(CASS_COLLECTION_TYPE_SET, 2); | |
cass_collection_append_string(collection, cass_string_init("jazz")); | |
cass_collection_append_string(collection, cass_string_init("2013")); | |
start = uv_hrtime(); | |
for(i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) { | |
CassUuid id; | |
CassStatement* statement; | |
if (prepared != NULL) { | |
statement = cass_prepared_bind(prepared); | |
} else { | |
statement = cass_statement_new(query, 5); | |
} | |
cass_uuid_generate_time(id); | |
cass_statement_bind_uuid(statement, 0, id); | |
cass_statement_bind_string(statement, 1, cass_string_init("La Petite Tonkinoise")); | |
cass_statement_bind_string(statement, 2, cass_string_init("Bye Bye Blackbird")); | |
cass_statement_bind_string(statement, 3, cass_string_init("Joséphine Baker")); | |
cass_statement_bind_collection(statement, 4, collection); | |
futures[i] = cass_session_execute(session, statement); | |
cass_statement_free(statement); | |
} | |
for(i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) { | |
CassFuture* future = futures[i]; | |
CassError rc = cass_future_error_code(future); | |
if(rc != CASS_OK) { | |
print_error(future); | |
} | |
cass_future_free(future); | |
} | |
elapsed = uv_hrtime() - start; | |
total += (double)NUM_CONCURRENT_REQUESTS / ((double)elapsed / 1000000000.0); | |
count++; | |
printf("average %lf inserts/sec\n", total / count); | |
} | |
void run_insert_queries(void* data) { | |
int i; | |
CassSession* session = (CassSession*)data; | |
const CassPrepared* insert_prepared = NULL; | |
CassString insert_query = cass_string_init("INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);"); | |
#define USE_PREPARED | |
#ifdef USE_PREPARED | |
if (prepare_query(session, insert_query, &insert_prepared) == CASS_OK) { | |
for (i = 0; i < 100; ++i) { | |
insert_into_perf(session, insert_query, insert_prepared); | |
} | |
cass_prepared_free(insert_prepared); | |
} | |
#else | |
insert_into_perf(session, insert_query, insert_prepared); | |
#endif | |
} | |
int main() { | |
int i; | |
uv_thread_t threads[NUM_THREADS]; | |
CassError rc = 0; | |
CassCluster* cluster = create_cluster(); | |
CassSession* session = NULL; | |
CassFuture* close_future = NULL; | |
rc = connect_session(cluster, &session); | |
if(rc != CASS_OK) { | |
return -1; | |
} | |
for (i = 0; i < NUM_THREADS; ++i) { | |
uv_thread_create(&threads[i], run_insert_queries, (void*)session); | |
} | |
for (i = 0; i < NUM_THREADS; ++i) { | |
uv_thread_join(&threads[i]); | |
} | |
close_future = cass_session_close(session); | |
cass_future_wait(close_future); | |
cass_future_free(close_future); | |
cass_cluster_free(cluster); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mikep.perf; | |
import static com.codahale.metrics.MetricRegistry.name; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.concurrent.ExecutionException; | |
import com.codahale.metrics.MetricRegistry; | |
import com.codahale.metrics.Timer; | |
import com.datastax.driver.core.BoundStatement; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.PreparedStatement; | |
import com.datastax.driver.core.ResultSetFuture; | |
import com.datastax.driver.core.Session; | |
import com.datastax.driver.core.utils.UUIDs; | |
import com.google.common.util.concurrent.Futures; | |
/** | |
* Hello world! | |
* | |
*/ | |
public class App | |
{ | |
final static MetricRegistry metrics = new MetricRegistry(); | |
final static Timer queries = metrics.timer(name(App.class, "queries")); | |
public static void main( String[] args ) throws ExecutionException, InterruptedException | |
{ | |
Cluster cluster = Cluster.builder() | |
.addContactPoint("127.0.0.1") | |
.withCredentials("cassandra", "cassandra") | |
.build(); | |
Session session = cluster.connect("examples"); | |
PreparedStatement prepared = session.prepare("INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);"); | |
List<ResultSetFuture> futures = new ArrayList<ResultSetFuture>(10000); | |
Set<String> collection = new HashSet<String>(); | |
collection.add("jazz"); | |
collection.add("2013"); | |
double total = 0.0; | |
long count = 0; | |
for (int j = 0; j < 1000; ++j) { | |
long start = System.currentTimeMillis(); | |
try { | |
for (int i = 0; i < 10000; ++i) { | |
BoundStatement statement = prepared.bind(); | |
statement.setUUID(0, UUIDs.timeBased()); | |
statement.setString(1, "La Petite Tonkinoise"); | |
statement.setString(2, "Bye Bye Blackbird"); | |
statement.setString(3, "Joséphine Baker"); | |
statement.setSet(4, collection); | |
futures.add(session.executeAsync(statement)); | |
} | |
Futures.allAsList(futures).get(); | |
} catch(Exception e) { | |
System.out.printf("Error: %s\n", e.getMessage()); | |
} | |
long elapsed = System.currentTimeMillis() - start; | |
total += 10000.0 / (elapsed / 1000.0); | |
count++; | |
System.out.printf("average %f inserts/sec\n", total / count); | |
//System.out.printf("%f inserts/sec\n", 10000.0 / (elapsed / 1000.0)); | |
futures.clear(); | |
} | |
session.close(); | |
cluster.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment