Skip to content

Instantly share code, notes, and snippets.

@njlr
Created June 27, 2017 12:25
Show Gist options
  • Save njlr/596c7d45143b3fbd29885a2c167078cb to your computer and use it in GitHub Desktop.
Save njlr/596c7d45143b3fbd29885a2c167078cb to your computer and use it in GitHub Desktop.
@Override
public ObservableSource<C> apply(final Observable<A> x) {
Objects.requireNonNull(x, "x is null");
return Observable.create(new ObservableOnSubscribe<C>() {
transient volatile Optional<A> last = Optional.empty();
private void sendPrevious(@NonNull final ObservableEmitter<C> e) {
if (last.isPresent()) {
e.onNext(last.get());
}
}
@Override
public void subscribe(@NonNull final ObservableEmitter<C> e) throws Exception {
x.subscribe(next -> {
sendPrevious(e);
last = Optional.of(next);
}, error -> {
sendPrevious(e);
e.onError(error);
}, () -> {
sendPrevious(e);
if (last.isPresent()) {
f.apply(last.get()).subscribe(e::onNext, e::onError, e::onComplete);
} else {
e.onComplete();
}
});
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment