Skip to content

Instantly share code, notes, and snippets.

@mgp
Created June 3, 2015 15:33
Show Gist options
  • Save mgp/76ed5c16e53c30500189 to your computer and use it in GitHub Desktop.
Save mgp/76ed5c16e53c30500189 to your computer and use it in GitHub Desktop.
ObservableUtilsTest.java
package org.khanacademy.core.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.khanacademy.core.util.test_util.Observables.collectAll;
import static org.khanacademy.core.util.test_util.Observables.collectFirst;
import org.khanacademy.core.base.BaseTestCase;
import org.khanacademy.core.exceptions.TestException;
import org.khanacademy.core.util.ObservableUtilsTest.NestedObservableEventAccumulator.Events;
import org.khanacademy.core.util.test_util.Observables;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.Timeout;
import rx.Notification;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import java.util.List;
public class ObservableUtilsTest extends BaseTestCase {
@ClassRule
public static final Timeout globalTimeout = Timeout.millis(500);
@Test
public void testObserveTrueValues() throws Exception {
Observable<Boolean> observable = Observable.from(
ImmutableList.of(false, true, false, true, false));
Observable<Boolean> filteredObservable = ObservableUtils.observeTrueValues(observable);
List<Boolean> actualValues = collectAll(filteredObservable);
List<Boolean> expectedValues = ImmutableList.of(true, true);
assertEquals(expectedValues, actualValues);
}
@Test
public void testObserveFalseValues() throws Exception {
Observable<Boolean> observable = Observable.from(
ImmutableList.of(true, false, true, false, true));
Observable<Boolean> filteredObservable = ObservableUtils.observeFalseValues(observable);
List<Boolean> actualValues = collectAll(filteredObservable);
List<Boolean> expectedValues = ImmutableList.of(false, false);
assertEquals(expectedValues, actualValues);
}
@Test
public void testMakeObservableSendsValueAndCompletes() throws Exception {
final int number = 100;
final Observable<Integer> observable = ObservableUtils.makeObservable(() -> number);
final List<Integer> result = collectAll(observable);
final List<Integer> expectedResult = ImmutableList.of(number);
assertEquals(expectedResult, result);
}
@Test
public void testThrownExceptionsBecomeErrorsInMakeObservable() throws Exception {
final Observable<Integer> observable = ObservableUtils.makeObservable(() -> {
throw new TestException();
});
final Notification<Integer> result = collectFirst(observable.materialize());
assertTrue(result.isOnError());
//noinspection ThrowableResultOfMethodCallIgnored
assertEquals(result.getThrowable().getClass(), TestException.class);
}
@Test
public void testMakeObservableWithActionCompletesWithNoValues() throws Exception {
final boolean[] invoked = {false};
final Observable<Object> observable = ObservableUtils.makeObservable(() -> invoked[0] = true);
assertFalse(invoked[0]);
observable.subscribe();
assertTrue(invoked[0]);
}
@Test
public void testHoldingItems() throws Exception {
BehaviorSubject<Integer> sourceStream = BehaviorSubject.create();
BehaviorSubject<Boolean> predicateStream = BehaviorSubject.create();
final List<Integer> emitted = Lists.newArrayList();
ObservableUtils.holdUnless(sourceStream.asObservable(), predicateStream.asObservable())
.subscribe(emitted::add);
// We get an item...
sourceStream.onNext(1);
// But the toggle is off, so the item is held...
assertTrue(emitted.isEmpty());
// Release the valve! The held item should be emitted now...
predicateStream.onNext(true);
assertEquals(1, emitted.size());
assertEquals(1, emitted.get(0).intValue());
// It should keep streaming since things are still open.
sourceStream.onNext(2);
sourceStream.onNext(3);
sourceStream.onNext(4);
assertEquals(4, emitted.size());
// Close the valve...
predicateStream.onNext(false);
// ...but re-opening should re-publish the last item from the source.
predicateStream.onNext(true);
assertEquals(5, emitted.size());
sourceStream.onCompleted();
predicateStream.onCompleted();
}
@Test
public void testCacheObservableWithCapacity1() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 1);
TestObserver<Integer> observer;
// Subscribe for the first time, without prior events
observer = new TestObserver<>();
cachedObservable.subscribe(observer);
observer.assertReceivedOnNext(ImmutableList.<Integer>of());
subject.onNext(1);
observer.assertReceivedOnNext(ImmutableList.of(1));
subject.onNext(2);
observer.assertReceivedOnNext(ImmutableList.of(1, 2));
// Resubscribe
observer = new TestObserver<>();
cachedObservable.subscribe(observer);
// Should receive the last value because capacity is 1.
observer.assertReceivedOnNext(ImmutableList.of(2));
}
@Test
public void testShouldNotSubscribeToSourceObservableUntilFirstSubscription() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 2);
final TestObserver<Integer> observer = new TestObserver<>();
subject.onNext(1);
// Subscribe
cachedObservable.subscribe(observer);
// Should not have subscribed to source observable before.
observer.assertReceivedOnNext(ImmutableList.<Integer>of());
}
@Test
public void testCacheObservableWithCapacity2() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Observable<Integer> cachedObservable = ObservableUtils.cache(subject, 2);
final TestObserver<Integer> observer = new TestObserver<>();
cachedObservable.subscribe();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
// Subscribe
cachedObservable.subscribe(observer);
// Should receive the last 2 values because capacity is 2.
observer.assertReceivedOnNext(ImmutableList.of(2, 3));
}
@Test
public void testCastObservable() throws Exception {
final List<Number> expectedValues = ImmutableList.of(1, 2, 3);
final Observable<Number> observable = Observable.from(expectedValues);
final Observable<Integer> downcast = ObservableUtils.downcast(observable, Integer.class);
assertEquals(
expectedValues,
Observables.collectAll(downcast)
);
}
@Test
public void testObservePresentOptionalValues() throws Exception {
final PublishSubject<Optional<String>> subject = PublishSubject.create();
final TestObserver<String> testObserver = new TestObserver<>();
ObservableUtils.observePresentOptionalValues(subject).subscribe(testObserver);
final String value1 = "value1";
final String value2 = "value2";
// Assert that the first present value is observed.
subject.onNext(Optional.of(value1));
testObserver.assertReceivedOnNext(ImmutableList.of(value1));
// Assert that the absent value is not observed.
subject.onNext(Optional.absent());
testObserver.assertReceivedOnNext(ImmutableList.of(value1));
// Assert that the second present value is observed.
subject.onNext(Optional.of(value2));
testObserver.assertReceivedOnNext(ImmutableList.of(value1, value2));
subject.onCompleted();
}
@Test
public void testPerformOperationForwardsOnCompleted() throws Exception {
final Observable<Void> observable = Observable.create(
subscriber -> subscriber.onCompleted()
);
final TestObserver<Object> testObserver = new TestObserver<>();
ObservableUtils.performOperation(
observable,
testObserver::onCompleted,
testObserver::onError
);
// Assert that observed an onComplete event.
final List<Notification<Object>> onCompletedEvents = testObserver.getOnCompletedEvents();
assertEquals(1, onCompletedEvents.size());
// Assert that no onNext or onError events were observed.
assertEquals(ImmutableList.of(), testObserver.getOnNextEvents());
assertEquals(ImmutableList.of(), testObserver.getOnErrorEvents());
}
@Test
public void testPerformOperationForwardsOnError() throws Exception {
final Throwable throwable = new Throwable();
final Observable<Void> observable = Observable.create(
subscriber -> subscriber.onError(throwable)
);
final TestObserver<Object> testObserver = new TestObserver<>();
ObservableUtils.performOperation(
observable,
testObserver::onCompleted,
testObserver::onError
);
// Assert that observed an onError event.
final List<Throwable> onErrorEvents = testObserver.getOnErrorEvents();
assertEquals(ImmutableList.of(throwable), onErrorEvents);
// Assert that no onNext or onCompleted events were observed.
assertEquals(ImmutableList.of(), testObserver.getOnNextEvents());
assertEquals(ImmutableList.of(), testObserver.getOnCompletedEvents());
}
private static final class NestedTestSubscriber<T> extends TestSubscriber<Observable<T>> {
private final List<List<Object>> values = Lists.newArrayList();
@Override
public void onNext(final Observable<T> observable) {
final TestObserver<T> testObserver = new TestObserver<>();
final TestSubscriber<T> testSubscriber = new TestSubscriber<T>(testObserver) {
@Override
public void onNext(final T t) {
super.onNext(t);
}
};
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
values.add(testObserver.getEvents());
}
}
static final class NestedObservableEventAccumulator<T> {
private final List<Events<T>> allEvents;
public static final class Events<T> {
public final List<T> values;
public final List<Throwable> throwables;
public final List<Notification<T>> completedNotifications;
public Events(final List<T> values,
final List<Throwable> throwables,
final List<Notification<T>> completedNotifications) {
this.values = ImmutableList.copyOf(values);
this.throwables = ImmutableList.copyOf(throwables);
this.completedNotifications = ImmutableList.copyOf(completedNotifications);
}
public static <T> Events<T> from(TestObserver<T> testObserver) {
final List<T> values = testObserver.getOnNextEvents();
final List<Throwable> throwables = testObserver.getOnErrorEvents();
final List<Notification<T>> completedNotifications =
testObserver.getOnCompletedEvents();
return new Events<T>(values, throwables, completedNotifications);
}
public static <T> Events<T> error(Throwable throwable, T... values) {
return new Events<>(
ImmutableList.copyOf(values),
ImmutableList.of(throwable),
ImmutableList.of()
);
}
public static <T> Events<T> completed(T... values) {
return new Events<>(
ImmutableList.copyOf(values),
ImmutableList.of(),
ImmutableList.of(Notification.createOnCompleted())
);
}
public static final class Builder<T> {
private final List<T> values;
private final List<Throwable> throwables;
private final List<Notification<T>> completedNotifications;
public Builder() {
values = Lists.newArrayList();
throwables = Lists.newArrayList();
completedNotifications = Lists.newArrayList();
}
public Builder<T> next(T value) {
values.add(value);
return this;
}
public Builder<T> error(Throwable throwable) {
throwables.add(throwable);
return this;
}
public Builder<T> completed() {
completedNotifications.add(Notification.createOnCompleted());
return this;
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Events<T> that = (Events<T>) o;
return (values.equals(that.values) &&
throwables.equals(that.throwables) &&
completedNotifications.equals(that.completedNotifications));
}
@Override
public int hashCode() {
return Objects.hashCode(values, throwables, completedNotifications);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("values", values)
.add("throwables", throwables)
.add("completedNotifications", completedNotifications)
.toString();
}
}
private NestedObservableEventAccumulator() {
allEvents = Lists.newArrayList();
}
static <T> NestedObservableEventAccumulator subscribe(
final Observable<Observable<T>> nestedObservable) {
final NestedObservableEventAccumulator accumulator =
new NestedObservableEventAccumulator();
nestedObservable.subscribe(new Action1<Observable<T>>() {
@Override
public void call(final Observable<T> observable) {
final TestObserver<T> testObserver = new TestObserver<T>() {
@Override
public void onNext(final T t) {
super.onNext(t);
}
@Override
public void onError(final Throwable e) {
super.onError(e);
accumulator.allEvents.add(Events.from(this));
}
@Override
public void onCompleted() {
super.onCompleted();
accumulator.allEvents.add(Events.from(this));
}
};
observable.subscribe(testObserver);
}
});
return accumulator;
}
}
@Test
public void testPartitionEmptyObservable() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> alwaysFalse = integer -> false;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), alwaysFalse);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(1, allEvents.size());
assertEquals(Events.completed(), allEvents.get(0));
}
@Test
public void testPartitionObservableWithNoTerminatingElement() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> alwaysFalse = integer -> false;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), alwaysFalse);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(1, allEvents.size());
assertEquals(Events.completed(1, 2, 3), allEvents.get(0));
}
@Test
public void testPartitionObservableWithOnlyTerminatingElements() throws Exception {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), isEven);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onNext(2);
subject.onNext(4);
subject.onNext(6);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(4, allEvents.size());
assertEquals(Events.completed(2), allEvents.get(0));
assertEquals(Events.completed(4), allEvents.get(1));
assertEquals(Events.completed(6), allEvents.get(2));
assertEquals(Events.completed(), allEvents.get(3));
}
@Test
public void testPartitionObservableStartingWithTerminatingElement() {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), isEven);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onNext(4);
subject.onNext(5);
subject.onNext(7);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(2, allEvents.size());
assertEquals(Events.completed(4), allEvents.get(0));
assertEquals(Events.completed(5, 7), allEvents.get(1));
}
@Test
public void testPartitionObservableNotStartingWithTerminatingElement() {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), isEven);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onNext(3);
subject.onNext(4);
subject.onNext(5);
subject.onNext(7);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(2, allEvents.size());
assertEquals(Events.completed(3, 4), allEvents.get(0));
assertEquals(Events.completed(5, 7), allEvents.get(1));
}
@Test
public void testPartitionObservableEndingWithTerminatingElement() {
final PublishSubject<Integer> subject = PublishSubject.create();
final Func1<Integer, Boolean> isEven = integer -> (integer % 2) == 0;
final Observable<Observable<Integer>> partitioningObservable =
ObservableUtils.partition(subject.asObservable(), isEven);
final NestedObservableEventAccumulator accumulator =
NestedObservableEventAccumulator.subscribe(partitioningObservable);
subject.onNext(5);
subject.onNext(6);
subject.onCompleted();
final List<Events<Integer>> allEvents = accumulator.allEvents;
assertEquals(2, allEvents.size());
assertEquals(Events.completed(5, 6), allEvents.get(0));
assertEquals(Events.completed(), allEvents.get(1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment