-
-
Save alexwen/3a6f446ca4bd557d3a09 to your computer and use it in GitHub Desktop.
example of using distinctUntilChanged and window to segment an ordered observable into discrete buckets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.loshodges; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.observers.TestSubscriber; | |
import rx.schedulers.Schedulers; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ThreadLocalRandom; | |
import static org.hamcrest.Matchers.hasSize; | |
import static org.hamcrest.Matchers.lessThanOrEqualTo; | |
import static org.junit.Assert.assertThat; | |
public class TestPredicateWindow { | |
private static final long PERCENT_KEEP = 75L; | |
private static final int MAX_ITEMS = 100000; | |
@Test | |
public void testPredicateWindow() throws InterruptedException { | |
/** | |
* With an ordered input observable, we want to segment the stream into | |
* discrete buckets based on a value of the objects themselves (time in real life) | |
* we know that the stream is ordered, but we cannot guarantee to have a datapoint | |
* for every possible value in the stream, thus we cannot use the {@link Observable#window(int)} | |
* methods. The time based window methods will not work either, as we may not be processing these | |
* in real time. | |
*/ | |
final Observable<Long> source = Observable.range(0, MAX_ITEMS, Schedulers.computation()) | |
.map(Long::valueOf) | |
.filter(this::random); | |
// publish is nice short-hand for the 'shared' pattern with window | |
final Observable<List<Long>> ranges = source | |
.publish(s -> { | |
// skip the first as it is empty | |
final Observable<Long> boundary = s.distinctUntilChanged(i -> i / 10).skip(1); | |
return s.window(boundary).flatMap(Observable::toList); | |
}); | |
final CountDownLatch done = new CountDownLatch(1); | |
final TestSubscriber<List<Long>> sub = new TestSubscriber<>(); | |
ranges.doOnNext(System.out::println).doOnCompleted(done::countDown).subscribe(sub); | |
done.await(); | |
sub.assertNoErrors(); | |
sub.getOnNextEvents().forEach(l -> assertThat(l, hasSize(lessThanOrEqualTo(10)))); | |
System.out.println(sub.getOnNextEvents().size() + " windows from " + MAX_ITEMS); | |
} | |
private Boolean random(Long aLong) { | |
return ThreadLocalRandom.current().nextLong(100) < PERCENT_KEEP; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
So I realized the problem with this (and my other implementation) is that it relies on the next onNext item to close the previous window, introducing a delay that's no bueno for lower rate observables. I think I should be able to modify the example i Had (https://gist.github.com/gjesse/72a478e5b8248861dd2c) to do this w/out too much trouble but i don't see a way to do it here unless there is a delay instituted, which in turn breaks for continuous observables