Skip to content

Instantly share code, notes, and snippets.

@rsds143
Created April 29, 2016 21:12
Show Gist options
  • Save rsds143/26271f351bdd679553d55368171407be to your computer and use it in GitHub Desktop.
Save rsds143/26271f351bdd679553d55368171407be to your computer and use it in GitHub Desktop.
/**
* I HAVE NOT BEEN FULLY TESTED DO NOT USE ME IN PRODUCTION CODE THERE ARE NO WARRANTIES
* This is with using the approach in the previous blog post
*/
public class BulkLoader {
private final int threads;
private final String[] contactHosts;
public BulkLoader(int threads, String...contactHosts){
this.threads = threads;
this.contactHosts = contactHosts;
}
public void ingest(Iterator<Object[]> boundItemsIterator, String insertCQL) throws InterruptedException {
Cluster cluster = Cluster.builder()
.addContactPoints(contactHosts)
.build();
Session session = cluster.newSession();
//there are better ways to do this using executor pools
List<ResultSetFuture> futures = new ArrayList<>();
final PreparedStatement statement = session.prepare(insertCQL);
int count = 0;
while (boundItemsIterator.hasNext()) {
BoundStatement boundStatement = statement.bind(boundItemsIterator.next());
ResultSetFuture future = session.executeAsync(boundStatement);
futures.add(future);
count++;
if(count % threads==0){
futures.forEach(ResultSetFuture::getUninterruptibly);
futures = new ArrayList<>();
}
}
session.close();
cluster.close();
}
public static void main(String[] args) throws InterruptedException {
Iterator<Object[]> rows = new Iterator<Object[]>() {
int i = 0;
Random random = new Random();
@Override
public boolean hasNext() {
return i!=100000;
}
@Override
public Object[] next() {
i++;
return new Object[]{i, String.valueOf(random.nextLong())};
}
};
System.out.println("Starting benchmark");
Stopwatch watch = Stopwatch.createStarted();
new BulkLoader(8, "127.0.0.1").ingest(rows,
"INSERT INTO my_test.rows (id, value) VALUES (?,?)");
System.out.println("total time seconds = " + watch.elapsed(TimeUnit.SECONDS));
watch.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment