Created
May 20, 2014 03:28
-
-
Save benjchristensen/f7969657650126b000be to your computer and use it in GitHub Desktop.
ConcatIfEmpty
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Notification; | |
import rx.Observable; | |
import rx.Observable.Operator; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
public class ConcatIfEmpty { | |
public static void main(String[] args) { | |
Observable<String> empty = Observable.empty(); | |
final Observable<String> o1 = Observable.from("hello", "world"); | |
final Observable<String> o2 = Observable.from("HELLO", "WORLD"); | |
concatIfEmptyViaMaterialize(o1, o2).subscribe(System.out::println); | |
System.out.println("--------------------"); | |
concatIfEmptyViaMaterialize(empty, o1).subscribe(System.out::println); | |
System.out.println("--------------------"); | |
concatIfEmptyViaMaterialize(empty, o2).subscribe(System.out::println); | |
System.out.println("--------------------"); | |
concatIfEmptyViaOperator(o1, o2).subscribe(System.out::println); | |
System.out.println("--------------------"); | |
concatIfEmptyViaOperator(empty, o1).subscribe(System.out::println); | |
System.out.println("--------------------"); | |
concatIfEmptyViaOperator(empty, o2).subscribe(System.out::println); | |
} | |
private static Observable<String> concatIfEmptyViaOperator(final Observable<String> o1, final Observable<String> o2) { | |
return o1.lift(new Operator<String, String>() { | |
@Override | |
public Subscriber<? super String> call(Subscriber<? super String> child) { | |
return new Subscriber<String>(child) { | |
boolean receivedOnNext = false; | |
@Override | |
public void onCompleted() { | |
if (receivedOnNext) { | |
child.onCompleted(); | |
} else { | |
// never received a value so let's use the second Observable | |
o2.unsafeSubscribe(child); | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(String t) { | |
receivedOnNext = true; | |
child.onNext(t); | |
} | |
}; | |
} | |
}); | |
} | |
private static Observable<String> concatIfEmptyViaMaterialize(final Observable<String> o1, final Observable<String> o2) { | |
return o1.materialize().flatMap(new Func1<Notification<String>, Observable<String>>() { | |
private boolean hasValues = false; | |
@Override | |
public Observable<String> call(Notification<String> n) { | |
if (n.isOnNext()) { | |
hasValues = true; | |
return Observable.just(n.getValue()); | |
} else if (n.isOnCompleted()) { | |
if (hasValues) { | |
// complete | |
return Observable.empty(); | |
} else { | |
// continue with the second Observable | |
return o2; | |
} | |
} else { | |
// emit the error and don't bother with the second Observable | |
return Observable.error(n.getThrowable()); | |
} | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This emits: