Skip to content

Instantly share code, notes, and snippets.

@mgp
Created June 3, 2015 15:33
Show Gist options
  • Save mgp/8a51f83d107534455697 to your computer and use it in GitHub Desktop.
Save mgp/8a51f83d107534455697 to your computer and use it in GitHub Desktop.
ObservableUtils.java
package org.khanacademy.core.util;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Optional;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* Additions to {@link rx}.
*/
public final class ObservableUtils {
private ObservableUtils() {
// Static methods only. Do not allow instantiation.
}
/**
* @return an observable that only emits {@code true} values from the given observable
*/
public static Observable<Boolean> observeTrueValues(final Observable<Boolean> observable) {
checkNotNull(observable);
return observable.filter(value -> value);
}
/**
* @return an observable that only emits {@code false} values from the given observable
*/
public static Observable<Boolean> observeFalseValues(final Observable<Boolean> observable) {
checkNotNull(observable);
return observable.filter(value -> !value);
}
/**
* Converts {@code function} into an Observable that emits the value returned or the error thrown.
*/
public static <O> Observable<O> makeObservable(final Func0<O> function) {
checkNotNull(function);
return Observable.defer(() -> Observable.just(function.call()));
}
/**
* Converts {@code function} into an Observable that emits the value returned or the error thrown.
*/
public static Observable<Void> makeObservable(final Action0 action) {
return makeObservable(() -> {
action.call();
return null;
});
}
/**
* Returns an {@link Observable} that partitions the given observable. The returned observable
* emits observables, each of which emits values emitted by the observable parameter until a
* value satisfies the given predicate. Then that observable completes, and the returned
* observable emits a new observable.
*
* For example, partition([1, 3, 5, 6, 8, 9, 10, 11], isEven) yields
* [[1, 3, 5, 6], [8], [9, 10], [11]].
*/
public static <T> Observable<Observable<T>> partition(
final Observable<T> observable, final Func1<T, Boolean> isTerminatingElement) {
final Observable<T> startingObservable = Observable
.<T>just(null)
.concatWith(observable.filter(isTerminatingElement));
return startingObservable
.map(t -> observable.takeUntil(isTerminatingElement));
}
/**
* Caches the emissions from the source {@code Observable} and replays them
* in order to subsequent {@code Subscriber}.
*
* It auto-subscribes to the source {@link Observable} only upon the first subscription.
*
* This method has the semantics that you would expect from {@link Observable#cache(int)},
* but the unlike that operator, this enforces {@code capacity}, it not just a hint.
*
* More details in this issue: https://github.com/ReactiveX/RxJava/issues/2913.
*/
public static <T> Observable<T> cache(final Observable<T> observable, final int capacity) {
return Observable.create(new OnSubscribeBoundedCache<>(observable, capacity));
}
/**
* Returns an observable whose elements will match {@code observable}'s,
* casted to {@code outputClass}.
*/
public static <I, O> Observable<O> downcast(final Observable<I> observable,
final Class<O> outputClass) {
return observable.map(outputClass::cast);
}
/**
* Sentinel indicating that items in the underlying source Observable shouldn't be passed on.
*
* Note we can't just use {@code null} since that may be a valid value emitted by the source.
*/
private static final Object DO_NOT_PASS = new Object();
/**
* Combines a source Observable with a predicate stream, holding items until a truthy predicate.
*
* If items are emitted from the source while the predicate stream is "closed", items will
* be not be passed through to the resulting Observable. Once the predicate stream emits true,
* it will release the last emitted item from the source Observable.
* Note that if the predicate Observable toggles between false and true and false and true,
* the last item emitted by the {@code source} will be delivered
* multiple times (once for each time the predicate goes from false to true). Consecutive
* truthy values in the predicate stream, however, will not cause multiple re-emissions.
*
* Does not run on any particular scheduler.
*/
public static <T> Observable<T> holdUnless(
Observable<T> source, Observable<Boolean> predicate) {
return Observable
.combineLatest(
source,
predicate.distinctUntilChanged(),
(item, predicateValue) -> (predicateValue) ? item : DO_NOT_PASS
)
.filter(o -> o != DO_NOT_PASS)
.map(object -> {
@SuppressWarnings("unchecked") // object can never be DO_NOT_PASS.
final T result = (T) object;
return result;
});
}
/**
* @return an {@link Observable} that returns only present values in the given observable of
* optional values
*/
public static <T> Observable<T> observePresentOptionalValues(
Observable<? extends Optional<? extends T>> observable) {
return observable.filter(Optional::isPresent).map(Optional::get);
}
/**
* Subscribes to the given observable such that it will invoke the given actions upon
* completion or an error. A client should use this method if subscribing to the
* {@link Observable} performs an underlying operation with side-effects, and completing the
* observable signifies that the underlying operation has completed.
*/
public static Subscription performOperation(final Observable<Void> observable,
final Action0 onCompleted,
final Action1<Throwable> onError) {
return observable.subscribe(
o -> {},
onError,
onCompleted
);
}
}
final class OnSubscribeBoundedCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
private volatile int sourceSubscribed;
static final AtomicIntegerFieldUpdater<OnSubscribeBoundedCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OnSubscribeBoundedCache.class, "sourceSubscribed");
public OnSubscribeBoundedCache(Observable<? extends T> source, int bufferSize) {
this.source = checkNotNull(source);
this.cache = ReplaySubject.<T>createWithSize(bufferSize);
}
@Override
public void call(final Subscriber<? super T> s) {
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
source.subscribe(cache);
}
cache.unsafeSubscribe(s);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment