Created
September 15, 2017 11:35
-
-
Save aballano/c9e69d8e94dec3f28620068394ed5ddf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.CompletableObserver | |
import io.reactivex.MaybeObserver | |
import io.reactivex.Observer | |
import io.reactivex.SingleObserver | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.exceptions.CompositeException | |
import io.reactivex.exceptions.Exceptions | |
import io.reactivex.functions.Action | |
import io.reactivex.functions.Consumer | |
import io.reactivex.internal.functions.Functions | |
import io.reactivex.plugins.RxJavaPlugins | |
class NonDisposableObserver<T> private constructor(private val onSuccess: Consumer<in T>, | |
private val onComplete: Action, | |
private val onError: Consumer<in Throwable>) | |
: CompletableObserver, ResultObserver<T> { | |
override fun onSubscribe(disposable: Disposable) { | |
// No-op | |
} | |
// Observable | |
override fun onNext(value: T) { | |
try { | |
onSuccess.accept(value) | |
} catch (exception: Exception) { | |
Exceptions.throwIfFatal(exception) | |
onError(exception) | |
} | |
} | |
// Single, Maybe | |
override fun onSuccess(value: T) { | |
try { | |
onSuccess.accept(value) | |
} catch (exception: Exception) { | |
Exceptions.throwIfFatal(exception) | |
RxJavaPlugins.onError(exception) | |
} | |
} | |
// Observable, Completable | |
override fun onComplete() { | |
try { | |
onComplete.run() | |
} catch (exception: Exception) { | |
Exceptions.throwIfFatal(exception) | |
RxJavaPlugins.onError(exception) | |
} | |
} | |
// All | |
override fun onError(throwable: Throwable) { | |
try { | |
onError.accept(throwable) | |
} catch (exception: Exception) { | |
Exceptions.throwIfFatal(exception) | |
RxJavaPlugins.onError(CompositeException(throwable, exception)) | |
} | |
} | |
companion object { | |
// Single, Maybe, Observable | |
@JvmStatic fun <T> create( | |
onSuccess: Consumer<in T>, | |
onError: Consumer<in Throwable>): ResultObserver<T> = | |
NonDisposableObserver(onSuccess, Functions.EMPTY_ACTION, onError) | |
// Observable | |
@JvmStatic fun <T> create( | |
onSuccess: Consumer<in T>, | |
onComplete: Action, | |
onError: Consumer<in Throwable>): ResultObserver<T> = | |
NonDisposableObserver(onSuccess, onComplete, onError) | |
// Completable | |
@JvmStatic fun create( | |
onComplete: Action, | |
onError: Consumer<in Throwable>): CompletableObserver = | |
NonDisposableObserver(Functions.emptyConsumer(), onComplete, onError) | |
} | |
} | |
interface ResultObserver<T> : SingleObserver<T>, Observer<T>, MaybeObserver<T> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment