Skip to content

Instantly share code, notes, and snippets.

@rsds143
Last active March 3, 2017 17:52
Show Gist options
  • Save rsds143/4b62b8e5625a805583c1ce39b1260ff4 to your computer and use it in GitHub Desktop.
Save rsds143/4b62b8e5625a805583c1ce39b1260ff4 to your computer and use it in GitHub Desktop.
with thread pool
/**
* I HAVE NOT BEEN FULLY TESTED DO NOT USE ME IN PRODUCTION CODE THERE ARE NO WARRANTIES
*/
public class BulkLoader {
private final int threads;
private final String[] contactHosts;
public BulkLoader(int threads, String...contactHosts){
this.threads = threads;
this.contactHosts = contactHosts;
}
//callback class
public static class IngestCallback implements FutureCallback<ResultSet>{
@Override
public void onSuccess(ResultSet result) {
//placeholder: put any logging or on success logic here.
}
@Override
public void onFailure(Throwable t) {
//go ahead and wrap in a runtime exception for this case, but you can do logging or start counting errors.
throw new RuntimeException(t);
}
}
public void ingest(Iterator<Object[]> boundItemsIterator, String insertCQL) throws InterruptedException {
Cluster cluster = Cluster.builder()
.addContactPoints(contactHosts)
.build();
Session session = cluster.newSession();
//fixed thread pool that closes on app exit
ExecutorService executor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor)Executors.newFixedThreadPool(threads));
final PreparedStatement statement = session.prepare(insertCQL);
while (boundItemsIterator.hasNext()) {
BoundStatement boundStatement = statement.bind(boundItemsIterator.next());
ResultSetFuture future = session.executeAsync(boundStatement);
Futures.addCallback(future, new IngestCallback(), executor);
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) { //dosomething}
session.close();
cluster.close();
}
public static void main(String[] args) throws InterruptedException {
Iterator<Object[]> rows = new Iterator<Object[]>() {
int i = 0;
Random random = new Random();
@Override
public boolean hasNext() {
return i!=1000000;
}
@Override
public Object[] next() {
i++;
return new Object[]{i, String.valueOf(random.nextLong())};
}
};
System.out.println("Starting benchmark");
Stopwatch watch = Stopwatch.createStarted();
new BulkLoader(8, "127.0.0.1").ingest(rows,
"INSERT INTO my_test.rows (id, value) VALUES (?,?)");
System.out.println("total time seconds = " + watch.elapsed(TimeUnit.SECONDS));
watch.stop();
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment