Skip to content

Instantly share code, notes, and snippets.

@adutra
Last active May 6, 2019 13:30
Show Gist options
  • Save adutra/389fdcd69e00ea83322fca30d04e7dd3 to your computer and use it in GitHub Desktop.
Save adutra/389fdcd69e00ea83322fca30d04e7dd3 to your computer and use it in GitHub Desktop.
Examples of reactive style programming with OSS DataStax Java driver 3.x and Reactor
package com.datastax.test;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SimpleReactiveClient {
public static void main(String[] args) {
try (Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
Session session = cluster.connect()) {
SimpleReactiveClient client = new SimpleReactiveClient();
client.prepareData(session);
client.retrieveAllDataAtOnce(session);
client.retrieveDataPageByPage(session);
client.retrieveDataRowByRow(session);
}
}
private void retrieveAllDataAtOnce(Session session) {
// Simplest method: will block between page boundaries
Mono.fromCompletionStage(asCompletionStage(session.executeAsync("SELECT * FROM t1")))
.flatMapMany(Flux::fromIterable)
.count()
.doOnNext(System.out::println)
.block();
}
private void retrieveDataPageByPage(Session session) {
// More elaborate method: will fetch pages asynchronously
fetchNextPage(session.executeAsync("SELECT * FROM t1"))
.count()
.doOnNext(System.out::println)
.block();
}
private Flux<Row> fetchNextPage(ListenableFuture<ResultSet> future) {
return Mono.fromCompletionStage(asCompletionStage(future))
.flatMapMany(
rs -> {
Flux<Row> rows = Flux.fromIterable(rs);
if (rs.isFullyFetched()) {
// this was the last page
return rows;
}
// only emit the current page's rows, then concat this flux with
// another flux containing the next page
return rows.take(rs.getAvailableWithoutFetching())
.concatWith(fetchNextPage(rs.fetchMoreResults()));
});
}
private void retrieveDataRowByRow(Session session) {
// Different method using Flux.generate: may block between page boundaries but has the ability
// to mitigate blocking calls by pre-fetching pages
Mono.fromCompletionStage(asCompletionStage(session.executeAsync("SELECT * FROM t1")))
// publish on a different thread since the rest of the flow has blocking calls
// and the future above is completed on a driver I/O thread.
.publishOn(Schedulers.single())
.flatMapMany(
rs ->
Flux.generate(
sink -> {
try {
// isExhausted() may block if a row is not readily available
if (rs.isExhausted()) {
sink.complete();
} else {
Row row = Objects.requireNonNull(rs.one());
sink.next(row);
// if the current page is almost exhausted, preemptively fetch the next
// one (note: this assumes that the page size is greater than 100)
if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched()) {
rs.fetchMoreResults();
}
}
} catch (Exception e) {
sink.error(e);
}
}))
.count()
.doOnNext(System.out::println)
.block();
}
private void prepareData(Session session) {
session.execute(
"CREATE KEYSPACE IF NOT EXISTS test WITH "
+ "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '1' }"
+ " AND DURABLE_WRITES = true;");
session.execute("USE test");
session.execute("DROP TABLE IF EXISTS t1");
session.execute("CREATE TABLE t1 (pk int PRIMARY KEY, v int)");
for (int i = 0; i < 10000; i++) {
session.execute("INSERT INTO t1 (pk, v) VALUES (?, ?)", i, i);
}
}
private static <T> CompletionStage<T> asCompletionStage(ListenableFuture<T> listenableFuture) {
// utility method to convert a Guava ListenableFuture into a Java 8 CompletionStage.
CompletableFuture<T> future =
new CompletableFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = listenableFuture.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
};
Futures.addCallback(
listenableFuture,
new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
future.complete(result);
}
@Override
public void onFailure(Throwable t) {
future.completeExceptionally(t);
}
},
// Note: this will complete the future on a driver I/O thread; if that future is coupled to
// downstream stages that perform blocking calls, this will result in an error as the
// driver will detect the blocking call and reject it. Consider using a different executor
// here, or use asynchronous stages only (e.g. use thenApplyAsync instead of thenApply).
MoreExecutors.directExecutor());
return future;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment