Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:01
Show Gist options
  • Save smaldini/48aed51d931f7f7e3155 to your computer and use it in GitHub Desktop.
Save smaldini/48aed51d931f7f7e3155 to your computer and use it in GitHub Desktop.
//A perfectly valid Reactive Streams Processor with Reactor (to attach to a Stream or connect with Actions/Promises) :
//This Processor is an Identity Producer/Subscriber - every request is propagated upside and every value propagated downside
// identityProcessor will not affect its input value and will eventually broadcast it to its subscribers.
deferred = Streams.defer(env);
deferred
.parallel(env.getDispatcher(dispatcher))
.map(stream -> stream
.map(i -> i)
.reduce((Tuple2<Integer, Integer> tup) -> {
int last = (null != tup.getT2() ? tup.getT2() : 1);
return last + tup.getT1();
})
.consume(i -> latch.countDown())
);
Processor<Integer,Integer> identityProcessor =
Action.<Integer>passthrough(env.getDispatcher("ringBuffer"))
.env(env)
.prefetch(bufferSize)
.buffer()
.map(integer -> integer)
.distinctUntilChanged()
.flatMap(integer -> Promises.success(integer))
.scan(Tuple2<Integer, Integer>::getT1)
.filter(integer -> integer >= 0)
.collect(1)
.last()
.<Integer>split()
.window(100)
.<Integer>split()
.flatMap(integer -> Streams.defer(integer))
.combine();
//combine is creating an Action (implements Processor) where its subscribe delegates to its downstream Publisher
// and where its onNext/onError/onSubscribe/onComplete delegates to its upstream Subscriber
//Producing to this combinated Stream flow from multiple upstream
Streams
.defer(Arrays.asList(1,2,3), env)
.merge(Streams.defer(Arrays.asList(4,5), env))
.subscribe(identityProcessor);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment