Created
September 20, 2018 10:47
-
-
Save adutra/bf731847e3a2d77c791b3dac8d5da291 to your computer and use it in GitHub Desktop.
Driver examples with Reactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.datastax.driver.dse; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.ResultSet; | |
import com.datastax.driver.core.ResultSetFuture; | |
import com.datastax.driver.core.Row; | |
import com.datastax.driver.core.Session; | |
import com.google.common.util.concurrent.FutureCallback; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import java.util.UUID; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.FluxSink; | |
import reactor.core.publisher.FluxSink.OverflowStrategy; | |
public class FluxExamples { | |
public static void main(String[] args) { | |
try (Cluster cluster = Cluster.builder().addContactPoint("127.0.1.1").build(); | |
Session session = cluster.connect() | |
) { | |
session.execute( | |
"CREATE KEYSPACE IF NOT EXISTS test WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); | |
session.execute("CREATE TABLE IF NOT EXISTS test.users (id uuid PRIMARY KEY, name text)"); | |
session.execute("TRUNCATE test.users"); | |
for (int i = 0; i < 10; i++) { | |
session.execute("INSERT INTO test.users (id, name) VALUES (?, ?)", | |
UUID.randomUUID(), | |
"user" + i); | |
} | |
synchronousExample(session); | |
asynchronousExample(session); | |
} | |
} | |
private static void synchronousExample(Session session) { | |
System.out.println("------------ Synchronous Example -----------------"); | |
ResultSet rs = session.execute("SELECT * FROM test.users"); | |
Long count = Flux.fromIterable(rs) | |
.map(User::new) | |
.doOnNext(System.out::println) | |
.count().block(); | |
System.out.printf("Found %d users total%n", count); | |
System.out.println(); | |
} | |
private static void asynchronousExample(Session session) { | |
System.out.println("------------ Asynchronous Example -----------------"); | |
Long count = Flux.<Row>create(sink -> { | |
ResultSetFuture future = session.executeAsync("SELECT * FROM test.users"); | |
consumeAndFetchNext(sink, future); | |
}, OverflowStrategy.BUFFER) // ATTENTION can OOM if subscriber not fast enough | |
.map(User::new) | |
.doOnNext(System.out::println) | |
.count().block(); | |
System.out.printf("Found %d users total%n", count); | |
System.out.println(); | |
} | |
private static void consumeAndFetchNext(FluxSink<Row> sink, ListenableFuture<ResultSet> future) { | |
Futures.addCallback(future, new FutureCallback<ResultSet>() { | |
@Override | |
public void onSuccess(ResultSet rs) { | |
// How far we can go without triggering the blocking fetch: | |
int remainingInPage = rs.getAvailableWithoutFetching(); | |
for (int i = 0; i < remainingInPage; i++) { | |
Row row = rs.one(); | |
sink.next(row); | |
} | |
boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null; | |
if (wasLastPage) { | |
sink.complete(); | |
} else { | |
ListenableFuture<ResultSet> future = rs.fetchMoreResults(); | |
consumeAndFetchNext(sink, future); | |
} | |
} | |
@Override | |
public void onFailure(Throwable t) { | |
sink.error(t); | |
} | |
}); | |
} | |
public static class User { | |
private final UUID id; | |
private final String name; | |
public User(Row row) { | |
id = row.getUUID("id"); | |
name = row.getString("name"); | |
} | |
UUID getId() { | |
return id; | |
} | |
String getName() { | |
return name; | |
} | |
@Override | |
public String toString() { | |
return "User{" + | |
"id=" + id + | |
", name='" + name + '\'' + | |
'}'; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment