Created
June 3, 2015 15:33
-
-
Save mgp/76ed5c16e53c30500189 to your computer and use it in GitHub Desktop.
ObservableUtilsTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.khanacademy.core.util; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertFalse; | |
import static org.junit.Assert.assertTrue; | |
import static org.khanacademy.core.util.test_util.Observables.collectAll; | |
import static org.khanacademy.core.util.test_util.Observables.collectFirst; | |
import org.khanacademy.core.base.BaseTestCase; | |
import org.khanacademy.core.exceptions.TestException; | |
import org.khanacademy.core.util.ObservableUtilsTest.NestedObservableEventAccumulator.Events; | |
import org.khanacademy.core.util.test_util.Observables; | |
import com.google.common.base.MoreObjects; | |
import com.google.common.base.Objects; | |
import com.google.common.base.Optional; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.Lists; | |
import org.junit.ClassRule; | |
import org.junit.Test; | |
import org.junit.rules.Timeout; | |
import rx.Notification; | |
import rx.Observable; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
import rx.observers.TestObserver; | |
import rx.observers.TestSubscriber; | |
import rx.subjects.BehaviorSubject; | |
import rx.subjects.PublishSubject; | |
import java.util.List; | |
public class ObservableUtilsTest extends BaseTestCase { | |
@ClassRule | |
public static final Timeout globalTimeout = Timeout.millis(500); | |
@Test | |
public void testObserveTrueValues() throws Exception { | |
Observable<Boolean> observable = Observable.from( | |
ImmutableList.of(false, true, false, true, false)); | |
Observable<Boolean> filteredObservable = ObservableUtils.observeTrueValues(observable); | |
List<Boolean> actualValues = collectAll(filteredObservable); | |
List<Boolean> expectedValues = ImmutableList.of(true, true); | |
assertEquals(expectedValues, actualValues); | |
} | |
@Test | |
public void testObserveFalseValues() throws Exception { | |
Observable<Boolean> observable = Observable.from( | |
ImmutableList.of(true, false, true, false, true)); | |
Observable<Boolean> filteredObservable = ObservableUtils.observeFalseValues(observable); | |
List<Boolean> actualValues = collectAll(filteredObservable); | |
List<Boolean> expectedValues = ImmutableList.of(false, false); | |
assertEquals(expectedValues, actualValues); | |
} | |
@Test | |
public void testMakeObservableSendsValueAndCompletes() throws Exception { | |
final int number = 100; | |
final Observable<Integer> observable = ObservableUtils.makeObservable(() -> number); | |
final List<Integer> result = collectAll(observable); | |
final List<Integer> expectedResult = ImmutableList.of(number); | |
assertEquals(expectedResult, result); | |
} | |
@Test | |
public void testThrownExceptionsBecomeErrorsInMakeObservable() throws Exception { | |
final Observable<Integer> observable = ObservableUtils.makeObservable(() -> { | |
throw new TestException(); | |
}); | |
final Notification<Integer> result = collectFirst(observable.materialize()); | |
assertTrue(result.isOnError()); | |
//noinspection ThrowableResultOfMethodCallIgnored | |
assertEquals(result.getThrowable().getClass(), TestException.class); | |
} | |
@Test | |
public void testMakeObservableWithActionCompletesWithNoValues() throws Exception { | |
final boolean[] invoked = {false}; | |
final Observable<Object> observable = ObservableUtils.makeObservable(() -> invoked[0] = true); | |
assertFalse(invoked[0]); | |
observable.subscribe(); | |
assertTrue(invoked[0]); | |
} | |
@Test | |
public void testHoldingItems() throws Exception { | |
BehaviorSubject<Integer> sourceStream = BehaviorSubject.create(); | |
BehaviorSubject<Boolean> predicateStream = BehaviorSubject.create(); | |
final List<Integer> emitted = Lists.newArrayList(); | |
ObservableUtils.holdUnless(sourceStream.asObservable(), predicateStream.asObservable()) | |
.subscribe(emitted::add); | |
// We get an item... | |
sourceStream.onNext(1); | |
// But the toggle is off, so the item is held... | |
assertTrue(emitted.isEmpty()); | |
// Release the valve! The held item should be emitted now... | |
predicateStream.onNext(true); | |
assertEquals(1, emitted.size()); | |
assertEquals(1, emitted.get(0).intValue()); | |
// It should keep streaming since things are still open. | |
sourceStream.onNext(2); | |
sourceStream.onNext(3); | |
sourceStream.onNext(4); | |
assertEquals(4, emitted.size()); | |
// Close the valve... | |
predicateStream.onNext(false); | |
// ...but re-opening should re-publish the last item from the source. | |
predicateStream.onNext(true); | |
assertEquals(5, emitted.size()); | |
sourceStream.onCompleted(); | |
predicateStream.onCompleted(); | |
} | |
@Test | |
public void testCacheObservableWithCapacity1() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 1); | |
TestObserver<Integer> observer; | |
// Subscribe for the first time, without prior events | |
observer = new TestObserver<>(); | |
cachedObservable.subscribe(observer); | |
observer.assertReceivedOnNext(ImmutableList.<Integer>of()); | |
subject.onNext(1); | |
observer.assertReceivedOnNext(ImmutableList.of(1)); | |
subject.onNext(2); | |
observer.assertReceivedOnNext(ImmutableList.of(1, 2)); | |
// Resubscribe | |
observer = new TestObserver<>(); | |
cachedObservable.subscribe(observer); | |
// Should receive the last value because capacity is 1. | |
observer.assertReceivedOnNext(ImmutableList.of(2)); | |
} | |
@Test | |
public void testShouldNotSubscribeToSourceObservableUntilFirstSubscription() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 2); | |
final TestObserver<Integer> observer = new TestObserver<>(); | |
subject.onNext(1); | |
// Subscribe | |
cachedObservable.subscribe(observer); | |
// Should not have subscribed to source observable before. | |
observer.assertReceivedOnNext(ImmutableList.<Integer>of()); | |
} | |
@Test | |
public void testCacheObservableWithCapacity2() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 2); | |
final TestObserver<Integer> observer = new TestObserver<>(); | |
cachedObservable.subscribe(); | |
subject.onNext(1); | |
subject.onNext(2); | |
subject.onNext(3); | |
// Subscribe | |
cachedObservable.subscribe(observer); | |
// Should receive the last 2 values because capacity is 2. | |
observer.assertReceivedOnNext(ImmutableList.of(2, 3)); | |
} | |
@Test | |
public void testCastObservable() throws Exception { | |
final List<Number> expectedValues = ImmutableList.of(1, 2, 3); | |
final Observable<Number> observable = Observable.from(expectedValues); | |
final Observable<Integer> downcast = ObservableUtils.downcast(observable, Integer.class); | |
assertEquals( | |
expectedValues, | |
Observables.collectAll(downcast) | |
); | |
} | |
@Test | |
public void testObservePresentOptionalValues() throws Exception { | |
final PublishSubject<Optional<String>> subject = PublishSubject.create(); | |
final TestObserver<String> testObserver = new TestObserver<>(); | |
ObservableUtils.observePresentOptionalValues(subject).subscribe(testObserver); | |
final String value1 = "value1"; | |
final String value2 = "value2"; | |
// Assert that the first present value is observed. | |
subject.onNext(Optional.of(value1)); | |
testObserver.assertReceivedOnNext(ImmutableList.of(value1)); | |
// Assert that the absent value is not observed. | |
subject.onNext(Optional.absent()); | |
testObserver.assertReceivedOnNext(ImmutableList.of(value1)); | |
// Assert that the second present value is observed. | |
subject.onNext(Optional.of(value2)); | |
testObserver.assertReceivedOnNext(ImmutableList.of(value1, value2)); | |
subject.onCompleted(); | |
} | |
@Test | |
public void testPerformOperationForwardsOnCompleted() throws Exception { | |
final Observable<Void> observable = Observable.create( | |
subscriber -> subscriber.onCompleted() | |
); | |
final TestObserver<Object> testObserver = new TestObserver<>(); | |
ObservableUtils.performOperation( | |
observable, | |
testObserver::onCompleted, | |
testObserver::onError | |
); | |
// Assert that observed an onComplete event. | |
final List<Notification<Object>> onCompletedEvents = testObserver.getOnCompletedEvents(); | |
assertEquals(1, onCompletedEvents.size()); | |
// Assert that no onNext or onError events were observed. | |
assertEquals(ImmutableList.of(), testObserver.getOnNextEvents()); | |
assertEquals(ImmutableList.of(), testObserver.getOnErrorEvents()); | |
} | |
@Test | |
public void testPerformOperationForwardsOnError() throws Exception { | |
final Throwable throwable = new Throwable(); | |
final Observable<Void> observable = Observable.create( | |
subscriber -> subscriber.onError(throwable) | |
); | |
final TestObserver<Object> testObserver = new TestObserver<>(); | |
ObservableUtils.performOperation( | |
observable, | |
testObserver::onCompleted, | |
testObserver::onError | |
); | |
// Assert that observed an onError event. | |
final List<Throwable> onErrorEvents = testObserver.getOnErrorEvents(); | |
assertEquals(ImmutableList.of(throwable), onErrorEvents); | |
// Assert that no onNext or onCompleted events were observed. | |
assertEquals(ImmutableList.of(), testObserver.getOnNextEvents()); | |
assertEquals(ImmutableList.of(), testObserver.getOnCompletedEvents()); | |
} | |
private static final class NestedTestSubscriber<T> extends TestSubscriber<Observable<T>> { | |
private final List<List<Object>> values = Lists.newArrayList(); | |
@Override | |
public void onNext(final Observable<T> observable) { | |
final TestObserver<T> testObserver = new TestObserver<>(); | |
final TestSubscriber<T> testSubscriber = new TestSubscriber<T>(testObserver) { | |
@Override | |
public void onNext(final T t) { | |
super.onNext(t); | |
} | |
}; | |
observable.subscribe(testSubscriber); | |
testSubscriber.awaitTerminalEvent(); | |
values.add(testObserver.getEvents()); | |
} | |
} | |
static final class NestedObservableEventAccumulator<T> { | |
private final List<Events<T>> allEvents; | |
public static final class Events<T> { | |
public final List<T> values; | |
public final List<Throwable> throwables; | |
public final List<Notification<T>> completedNotifications; | |
public Events(final List<T> values, | |
final List<Throwable> throwables, | |
final List<Notification<T>> completedNotifications) { | |
this.values = ImmutableList.copyOf(values); | |
this.throwables = ImmutableList.copyOf(throwables); | |
this.completedNotifications = ImmutableList.copyOf(completedNotifications); | |
} | |
public static <T> Events<T> from(TestObserver<T> testObserver) { | |
final List<T> values = testObserver.getOnNextEvents(); | |
final List<Throwable> throwables = testObserver.getOnErrorEvents(); | |
final List<Notification<T>> completedNotifications = | |
testObserver.getOnCompletedEvents(); | |
return new Events<T>(values, throwables, completedNotifications); | |
} | |
public static <T> Events<T> error(Throwable throwable, T... values) { | |
return new Events<>( | |
ImmutableList.copyOf(values), | |
ImmutableList.of(throwable), | |
ImmutableList.of() | |
); | |
} | |
public static <T> Events<T> completed(T... values) { | |
return new Events<>( | |
ImmutableList.copyOf(values), | |
ImmutableList.of(), | |
ImmutableList.of(Notification.createOnCompleted()) | |
); | |
} | |
public static final class Builder<T> { | |
private final List<T> values; | |
private final List<Throwable> throwables; | |
private final List<Notification<T>> completedNotifications; | |
public Builder() { | |
values = Lists.newArrayList(); | |
throwables = Lists.newArrayList(); | |
completedNotifications = Lists.newArrayList(); | |
} | |
public Builder<T> next(T value) { | |
values.add(value); | |
return this; | |
} | |
public Builder<T> error(Throwable throwable) { | |
throwables.add(throwable); | |
return this; | |
} | |
public Builder<T> completed() { | |
completedNotifications.add(Notification.createOnCompleted()); | |
return this; | |
} | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
Events<T> that = (Events<T>) o; | |
return (values.equals(that.values) && | |
throwables.equals(that.throwables) && | |
completedNotifications.equals(that.completedNotifications)); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hashCode(values, throwables, completedNotifications); | |
} | |
@Override | |
public String toString() { | |
return MoreObjects.toStringHelper(this) | |
.add("values", values) | |
.add("throwables", throwables) | |
.add("completedNotifications", completedNotifications) | |
.toString(); | |
} | |
} | |
private NestedObservableEventAccumulator() { | |
allEvents = Lists.newArrayList(); | |
} | |
static <T> NestedObservableEventAccumulator subscribe( | |
final Observable<Observable<T>> nestedObservable) { | |
final NestedObservableEventAccumulator accumulator = | |
new NestedObservableEventAccumulator(); | |
nestedObservable.subscribe(new Action1<Observable<T>>() { | |
@Override | |
public void call(final Observable<T> observable) { | |
final TestObserver<T> testObserver = new TestObserver<T>() { | |
@Override | |
public void onNext(final T t) { | |
super.onNext(t); | |
} | |
@Override | |
public void onError(final Throwable e) { | |
super.onError(e); | |
accumulator.allEvents.add(Events.from(this)); | |
} | |
@Override | |
public void onCompleted() { | |
super.onCompleted(); | |
accumulator.allEvents.add(Events.from(this)); | |
} | |
}; | |
observable.subscribe(testObserver); | |
} | |
}); | |
return accumulator; | |
} | |
} | |
@Test | |
public void testPartitionEmptyObservable() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> alwaysFalse = integer -> false; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), alwaysFalse); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(1, allEvents.size()); | |
assertEquals(Events.completed(), allEvents.get(0)); | |
} | |
@Test | |
public void testPartitionObservableWithNoTerminatingElement() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> alwaysFalse = integer -> false; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), alwaysFalse); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onNext(1); | |
subject.onNext(2); | |
subject.onNext(3); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(1, allEvents.size()); | |
assertEquals(Events.completed(1, 2, 3), allEvents.get(0)); | |
} | |
@Test | |
public void testPartitionObservableWithOnlyTerminatingElements() throws Exception { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), isEven); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onNext(2); | |
subject.onNext(4); | |
subject.onNext(6); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(4, allEvents.size()); | |
assertEquals(Events.completed(2), allEvents.get(0)); | |
assertEquals(Events.completed(4), allEvents.get(1)); | |
assertEquals(Events.completed(6), allEvents.get(2)); | |
assertEquals(Events.completed(), allEvents.get(3)); | |
} | |
@Test | |
public void testPartitionObservableStartingWithTerminatingElement() { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), isEven); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onNext(4); | |
subject.onNext(5); | |
subject.onNext(7); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(2, allEvents.size()); | |
assertEquals(Events.completed(4), allEvents.get(0)); | |
assertEquals(Events.completed(5, 7), allEvents.get(1)); | |
} | |
@Test | |
public void testPartitionObservableNotStartingWithTerminatingElement() { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), isEven); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onNext(3); | |
subject.onNext(4); | |
subject.onNext(5); | |
subject.onNext(7); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(2, allEvents.size()); | |
assertEquals(Events.completed(3, 4), allEvents.get(0)); | |
assertEquals(Events.completed(5, 7), allEvents.get(1)); | |
} | |
@Test | |
public void testPartitionObservableEndingWithTerminatingElement() { | |
final PublishSubject<Integer> subject = PublishSubject.create(); | |
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0; | |
final Observable<Observable<Integer>> partitioningObservable = | |
ObservableUtils.partition(subject.asObservable(), isEven); | |
final NestedObservableEventAccumulator accumulator = | |
NestedObservableEventAccumulator.subscribe(partitioningObservable); | |
subject.onNext(5); | |
subject.onNext(6); | |
subject.onCompleted(); | |
final List<Events<Integer>> allEvents = accumulator.allEvents; | |
assertEquals(2, allEvents.size()); | |
assertEquals(Events.completed(5, 6), allEvents.get(0)); | |
assertEquals(Events.completed(), allEvents.get(1)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment