Skip to content

Instantly share code, notes, and snippets.

@jbrisbin
Last active December 23, 2015 18:39
Show Gist options
  • Save jbrisbin/6677409 to your computer and use it in GitHub Desktop.
Save jbrisbin/6677409 to your computer and use it in GitHub Desktop.
Example of aggregating data using Streams.
import reactor.core.Environment;
import reactor.core.composable.Deferred;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Streams;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.tuple.Tuple2;
/**
* @author Jon Brisbin
*/
public class AggregateOrders {
public static void main(String... args) {
Environment env = new Environment();
Deferred<NewOrder, Stream<NewOrder>> d = Streams.<NewOrder>defer()
.env(env)
.dispatcher("ringBuffer")
.get();
d.compose()
.reduce(new Function<Tuple2<NewOrder, AggregateOrder>, AggregateOrder>() {
@Override
public AggregateOrder apply(Tuple2<NewOrder, AggregateOrder> tup) {
NewOrder newOrder = tup.getT1();
AggregateOrder aggOrder = tup.getT2();
if (validate(newOrder)) {
aggOrder.add(newOrder);
}
return aggOrder;
}
private boolean validate(NewOrder order) {
if (order instanceof NewOrderSingle) {
return true;
} else {
return false;
}
}
}, new AggregateOrder())
.consume(new Consumer<AggregateOrder>() {
@Override
public void accept(AggregateOrder aggOrder) {
if (aggOrder.isComplete()) {
// consume AggregateOrder
}
}
});
// publish events
d.accept(new NewOrderSingle());
d.accept(new NewOrderList());
}
static class AggregateOrder {
public boolean isComplete() { return false; }
public AggregateOrder add(NewOrder order) { return this; }
}
static class NewOrder {}
static class NewOrderSingle extends NewOrder {}
static class NewOrderList extends NewOrder {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment