Skip to content

Instantly share code, notes, and snippets.

@demixdn
Last active March 21, 2017 08:19
Show Gist options
  • Save demixdn/d9f345e3c35ccf901227d4def78de6e5 to your computer and use it in GitHub Desktop.
Save demixdn/d9f345e3c35ccf901227d4def78de6e5 to your computer and use it in GitHub Desktop.
RxJava2 helper classes
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
/**
* @author Aleks Sander
*/
public final class MainThreadSchedule<Stream> implements ObservableTransformer<Stream, Stream> {
private MainThreadSchedule(){
//private empty
}
public static <Stream> MainThreadSchedule<Stream> create() {
return new MainThreadSchedule<>();
}
@Override
public ObservableSource<Stream> apply(Observable<Stream> upstream) {
return upstream.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
}
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.internal.disposables.EmptyDisposable;
public final class OperatorIfThen<R> extends Observable<R> {
private final BooleanSupplier condition;
private final Observable<? extends R> then;
private final Observable<? extends R> orElse;
public OperatorIfThen(BooleanSupplier condition, Observable<? extends R> then, Observable<? extends R> orElse) {
this.condition = condition;
this.then = then;
this.orElse = orElse;
}
@Override
protected void subscribeActual(Observer<? super R> observer) {
boolean b;
try {
b = condition.getAsBoolean();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
if (b) {
then.subscribe(observer);
} else {
orElse.subscribe(observer);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment