Created
June 3, 2015 15:33
-
-
Save mgp/8a51f83d107534455697 to your computer and use it in GitHub Desktop.
ObservableUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.khanacademy.core.util; | |
import static com.google.common.base.Preconditions.checkNotNull; | |
import com.google.common.base.Optional; | |
import rx.Observable; | |
import rx.Observable.OnSubscribe; | |
import rx.Subscriber; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.functions.Action1; | |
import rx.functions.Func0; | |
import rx.functions.Func1; | |
import rx.subjects.ReplaySubject; | |
import rx.subjects.Subject; | |
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; | |
/** | |
* Additions to {@link rx}. | |
*/ | |
public final class ObservableUtils { | |
private ObservableUtils() { | |
// Static methods only. Do not allow instantiation. | |
} | |
/** | |
* @return an observable that only emits {@code true} values from the given observable | |
*/ | |
public static Observable<Boolean> observeTrueValues(final Observable<Boolean> observable) { | |
checkNotNull(observable); | |
return observable.filter(value -> value); | |
} | |
/** | |
* @return an observable that only emits {@code false} values from the given observable | |
*/ | |
public static Observable<Boolean> observeFalseValues(final Observable<Boolean> observable) { | |
checkNotNull(observable); | |
return observable.filter(value -> !value); | |
} | |
/** | |
* Converts {@code function} into an Observable that emits the value returned or the error thrown. | |
*/ | |
public static <O> Observable<O> makeObservable(final Func0<O> function) { | |
checkNotNull(function); | |
return Observable.defer(() -> Observable.just(function.call())); | |
} | |
/** | |
* Converts {@code function} into an Observable that emits the value returned or the error thrown. | |
*/ | |
public static Observable<Void> makeObservable(final Action0 action) { | |
return makeObservable(() -> { | |
action.call(); | |
return null; | |
}); | |
} | |
/** | |
* Returns an {@link Observable} that partitions the given observable. The returned observable | |
* emits observables, each of which emits values emitted by the observable parameter until a | |
* value satisfies the given predicate. Then that observable completes, and the returned | |
* observable emits a new observable. | |
* | |
* For example, partition([1, 3, 5, 6, 8, 9, 10, 11], isEven) yields | |
* [[1, 3, 5, 6], [8], [9, 10], [11]]. | |
*/ | |
public static <T> Observable<Observable<T>> partition( | |
final Observable<T> observable, final Func1<T, Boolean> isTerminatingElement) { | |
final Observable<T> startingObservable = Observable | |
.<T>just(null) | |
.concatWith(observable.filter(isTerminatingElement)); | |
return startingObservable | |
.map(t -> observable.takeUntil(isTerminatingElement)); | |
} | |
/** | |
* Caches the emissions from the source {@code Observable} and replays them | |
* in order to subsequent {@code Subscriber}. | |
* | |
* It auto-subscribes to the source {@link Observable} only upon the first subscription. | |
* | |
* This method has the semantics that you would expect from {@link Observable#cache(int)}, | |
* but the unlike that operator, this enforces {@code capacity}, it not just a hint. | |
* | |
* More details in this issue: https://github.com/ReactiveX/RxJava/issues/2913. | |
*/ | |
public static <T> Observable<T> cache(final Observable<T> observable, final int capacity) { | |
return Observable.create(new OnSubscribeBoundedCache<>(observable, capacity)); | |
} | |
/** | |
* Returns an observable whose elements will match {@code observable}'s, | |
* casted to {@code outputClass}. | |
*/ | |
public static <I, O> Observable<O> downcast(final Observable<I> observable, | |
final Class<O> outputClass) { | |
return observable.map(outputClass::cast); | |
} | |
/** | |
* Sentinel indicating that items in the underlying source Observable shouldn't be passed on. | |
* | |
* Note we can't just use {@code null} since that may be a valid value emitted by the source. | |
*/ | |
private static final Object DO_NOT_PASS = new Object(); | |
/** | |
* Combines a source Observable with a predicate stream, holding items until a truthy predicate. | |
* | |
* If items are emitted from the source while the predicate stream is "closed", items will | |
* be not be passed through to the resulting Observable. Once the predicate stream emits true, | |
* it will release the last emitted item from the source Observable. | |
* Note that if the predicate Observable toggles between false and true and false and true, | |
* the last item emitted by the {@code source} will be delivered | |
* multiple times (once for each time the predicate goes from false to true). Consecutive | |
* truthy values in the predicate stream, however, will not cause multiple re-emissions. | |
* | |
* Does not run on any particular scheduler. | |
*/ | |
public static <T> Observable<T> holdUnless( | |
Observable<T> source, Observable<Boolean> predicate) { | |
return Observable | |
.combineLatest( | |
source, | |
predicate.distinctUntilChanged(), | |
(item, predicateValue) -> (predicateValue) ? item : DO_NOT_PASS | |
) | |
.filter(o -> o != DO_NOT_PASS) | |
.map(object -> { | |
@SuppressWarnings("unchecked") // object can never be DO_NOT_PASS. | |
final T result = (T) object; | |
return result; | |
}); | |
} | |
/** | |
* @return an {@link Observable} that returns only present values in the given observable of | |
* optional values | |
*/ | |
public static <T> Observable<T> observePresentOptionalValues( | |
Observable<? extends Optional<? extends T>> observable) { | |
return observable.filter(Optional::isPresent).map(Optional::get); | |
} | |
/** | |
* Subscribes to the given observable such that it will invoke the given actions upon | |
* completion or an error. A client should use this method if subscribing to the | |
* {@link Observable} performs an underlying operation with side-effects, and completing the | |
* observable signifies that the underlying operation has completed. | |
*/ | |
public static Subscription performOperation(final Observable<Void> observable, | |
final Action0 onCompleted, | |
final Action1<Throwable> onError) { | |
return observable.subscribe( | |
o -> {}, | |
onError, | |
onCompleted | |
); | |
} | |
} | |
final class OnSubscribeBoundedCache<T> implements OnSubscribe<T> { | |
protected final Observable<? extends T> source; | |
protected final Subject<? super T, ? extends T> cache; | |
private volatile int sourceSubscribed; | |
static final AtomicIntegerFieldUpdater<OnSubscribeBoundedCache> SRC_SUBSCRIBED_UPDATER | |
= AtomicIntegerFieldUpdater.newUpdater(OnSubscribeBoundedCache.class, "sourceSubscribed"); | |
public OnSubscribeBoundedCache(Observable<? extends T> source, int bufferSize) { | |
this.source = checkNotNull(source); | |
this.cache = ReplaySubject.<T>createWithSize(bufferSize); | |
} | |
@Override | |
public void call(final Subscriber<? super T> s) { | |
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { | |
source.subscribe(cache); | |
} | |
cache.unsafeSubscribe(s); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment