Last active
May 6, 2019 13:30
-
-
Save adutra/389fdcd69e00ea83322fca30d04e7dd3 to your computer and use it in GitHub Desktop.
Examples of reactive style programming with OSS DataStax Java driver 3.x and Reactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.datastax.test; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.ResultSet; | |
import com.datastax.driver.core.Row; | |
import com.datastax.driver.core.Session; | |
import com.google.common.util.concurrent.FutureCallback; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import java.util.Objects; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CompletionStage; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.scheduler.Schedulers; | |
public class SimpleReactiveClient { | |
public static void main(String[] args) { | |
try (Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); | |
Session session = cluster.connect()) { | |
SimpleReactiveClient client = new SimpleReactiveClient(); | |
client.prepareData(session); | |
client.retrieveAllDataAtOnce(session); | |
client.retrieveDataPageByPage(session); | |
client.retrieveDataRowByRow(session); | |
} | |
} | |
private void retrieveAllDataAtOnce(Session session) { | |
// Simplest method: will block between page boundaries | |
Mono.fromCompletionStage(asCompletionStage(session.executeAsync("SELECT * FROM t1"))) | |
.flatMapMany(Flux::fromIterable) | |
.count() | |
.doOnNext(System.out::println) | |
.block(); | |
} | |
private void retrieveDataPageByPage(Session session) { | |
// More elaborate method: will fetch pages asynchronously | |
fetchNextPage(session.executeAsync("SELECT * FROM t1")) | |
.count() | |
.doOnNext(System.out::println) | |
.block(); | |
} | |
private Flux<Row> fetchNextPage(ListenableFuture<ResultSet> future) { | |
return Mono.fromCompletionStage(asCompletionStage(future)) | |
.flatMapMany( | |
rs -> { | |
Flux<Row> rows = Flux.fromIterable(rs); | |
if (rs.isFullyFetched()) { | |
// this was the last page | |
return rows; | |
} | |
// only emit the current page's rows, then concat this flux with | |
// another flux containing the next page | |
return rows.take(rs.getAvailableWithoutFetching()) | |
.concatWith(fetchNextPage(rs.fetchMoreResults())); | |
}); | |
} | |
private void retrieveDataRowByRow(Session session) { | |
// Different method using Flux.generate: may block between page boundaries but has the ability | |
// to mitigate blocking calls by pre-fetching pages | |
Mono.fromCompletionStage(asCompletionStage(session.executeAsync("SELECT * FROM t1"))) | |
// publish on a different thread since the rest of the flow has blocking calls | |
// and the future above is completed on a driver I/O thread. | |
.publishOn(Schedulers.single()) | |
.flatMapMany( | |
rs -> | |
Flux.generate( | |
sink -> { | |
try { | |
// isExhausted() may block if a row is not readily available | |
if (rs.isExhausted()) { | |
sink.complete(); | |
} else { | |
Row row = Objects.requireNonNull(rs.one()); | |
sink.next(row); | |
// if the current page is almost exhausted, preemptively fetch the next | |
// one (note: this assumes that the page size is greater than 100) | |
if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched()) { | |
rs.fetchMoreResults(); | |
} | |
} | |
} catch (Exception e) { | |
sink.error(e); | |
} | |
})) | |
.count() | |
.doOnNext(System.out::println) | |
.block(); | |
} | |
private void prepareData(Session session) { | |
session.execute( | |
"CREATE KEYSPACE IF NOT EXISTS test WITH " | |
+ "REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor': '1' }" | |
+ " AND DURABLE_WRITES = true;"); | |
session.execute("USE test"); | |
session.execute("DROP TABLE IF EXISTS t1"); | |
session.execute("CREATE TABLE t1 (pk int PRIMARY KEY, v int)"); | |
for (int i = 0; i < 10000; i++) { | |
session.execute("INSERT INTO t1 (pk, v) VALUES (?, ?)", i, i); | |
} | |
} | |
private static <T> CompletionStage<T> asCompletionStage(ListenableFuture<T> listenableFuture) { | |
// utility method to convert a Guava ListenableFuture into a Java 8 CompletionStage. | |
CompletableFuture<T> future = | |
new CompletableFuture<T>() { | |
@Override | |
public boolean cancel(boolean mayInterruptIfRunning) { | |
boolean result = listenableFuture.cancel(mayInterruptIfRunning); | |
super.cancel(mayInterruptIfRunning); | |
return result; | |
} | |
}; | |
Futures.addCallback( | |
listenableFuture, | |
new FutureCallback<T>() { | |
@Override | |
public void onSuccess(T result) { | |
future.complete(result); | |
} | |
@Override | |
public void onFailure(Throwable t) { | |
future.completeExceptionally(t); | |
} | |
}, | |
// Note: this will complete the future on a driver I/O thread; if that future is coupled to | |
// downstream stages that perform blocking calls, this will result in an error as the | |
// driver will detect the blocking call and reject it. Consider using a different executor | |
// here, or use asynchronous stages only (e.g. use thenApplyAsync instead of thenApply). | |
MoreExecutors.directExecutor()); | |
return future; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment