Created
November 21, 2015 15:46
-
-
Save gjesse/8d1d06921733cdf87a6e to your computer and use it in GitHub Desktop.
example of using distinctUntilChanged and window to segment an ordered observable into discrete buckets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.loshodges; | |
import com.google.common.collect.Lists; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.observers.TestSubscriber; | |
import rx.schedulers.Schedulers; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.concurrent.TimeUnit; | |
import static org.assertj.core.api.Assertions.assertThat; | |
public class TestPredicateWindow { | |
private static final long PERCENT_KEEP = 75L; | |
private static final int MAX_ITEMS = 100000; | |
@Test | |
public void testPredicateWindow() throws InterruptedException { | |
/** | |
* With an ordered input observable, we want to segment the stream into | |
* discrete buckets based on a value of the objects themselves (time in real life) | |
* we know that the stream is ordered, but we cannot guarantee to have a datapoint | |
* for every possible value in the stream, thus we cannot use the {@link Observable#window(int)} | |
* methods. The time based window methods will not work either, as we may not be processing these | |
* in real time. | |
*/ | |
Observable<Long> source = Observable.range(0, MAX_ITEMS, Schedulers.computation()) | |
.map(Long::valueOf) | |
.filter(this::random); | |
Observable<Long> shared = source.share(); | |
CountDownLatch done = new CountDownLatch(1); | |
List<TestSubscriber<List<Long>>> subs = Lists.newCopyOnWriteArrayList(); | |
Observable<Long> boundary = shared.distinctUntilChanged(i -> i / 10); | |
shared.window(boundary).subscribe(window -> { | |
TestSubscriber<List<Long>> sub = new TestSubscriber<>(); | |
window.toList().subscribe(sub); | |
subs.add(sub); | |
}, System.out::println, done::countDown); | |
done.await(); | |
subs.forEach(sub -> { | |
sub.awaitTerminalEvent(); | |
sub.assertValueCount(1); | |
assertThat(sub.getOnNextEvents().get(0).size()).isLessThanOrEqualTo(10); | |
System.out.println(sub.getOnNextEvents()); | |
}); | |
System.out.println(subs.size() + " windows from " + MAX_ITEMS); | |
} | |
private Boolean random(Long aLong) { | |
return ThreadLocalRandom.current().nextLong(100) < PERCENT_KEEP; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment