Last active
July 15, 2019 06:02
-
-
Save SergejIsbrecht/49bc79649958c98e54829bfb3110569a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import io.reactivex.Completable; | |
| import io.reactivex.Flowable; | |
| import io.reactivex.Maybe; | |
| import io.reactivex.Observable; | |
| import io.reactivex.Scheduler; | |
| import io.reactivex.Single; | |
| import io.reactivex.disposables.Disposable; | |
| import io.reactivex.observers.TestObserver; | |
| import io.reactivex.schedulers.TestScheduler; | |
| import io.reactivex.subjects.SingleSubject; | |
| import org.junit.jupiter.api.BeforeEach; | |
| import org.junit.jupiter.api.Test; | |
| import org.reactivestreams.Publisher; | |
| import java.lang.reflect.InvocationHandler; | |
| import java.lang.reflect.Method; | |
| import java.lang.reflect.Proxy; | |
| import java.util.Objects; | |
| import java.util.concurrent.CancellationException; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.atomic.AtomicBoolean; | |
| class ObservableChain { | |
| private Dummy dummy; | |
| private TestScheduler testScheduler; | |
| private ProviderProxy providerProxy; | |
| @BeforeEach | |
| void setUp() { | |
| testScheduler = new TestScheduler(); | |
| TestClazz service = new TestClazz(testScheduler); | |
| providerProxy = new ProviderProxy(service); | |
| dummy = | |
| (Dummy) | |
| Proxy.newProxyInstance( | |
| ObservableChain.class.getClassLoader(), new Class[] {Dummy.class}, providerProxy); | |
| } | |
| @Test | |
| void nameX() { | |
| TestObserver<Long> test = dummy.value().test(); | |
| testScheduler.advanceTimeBy(2001, TimeUnit.MILLISECONDS); | |
| providerProxy.dispose(); | |
| testScheduler.advanceTimeBy(2001, TimeUnit.MILLISECONDS); | |
| test.assertComplete().assertNoErrors().assertValues(0L, 1L, 2L); | |
| dummy.value().test().assertComplete(); | |
| } | |
| @Test | |
| void nameY() { | |
| providerProxy.dispose(); | |
| dummy.value().test().assertComplete().assertNoErrors().assertNoValues(); | |
| } | |
| @Test | |
| void nameZ() { | |
| TestObserver<Long> single = dummy.single().test(); | |
| providerProxy.dispose(); | |
| testScheduler.advanceTimeBy(1, TimeUnit.DAYS); | |
| single.assertNoValues().assertError(CancellationException.class); | |
| } | |
| private interface Dummy { | |
| Observable<Long> value(); | |
| Single<Long> single(); | |
| } | |
| private static class ProviderProxy implements InvocationHandler, Disposable { | |
| private static final Object NULL_ELEMENT = new Object(); | |
| private final AtomicBoolean isDisposed; | |
| private final Object service; | |
| private Publisher<Object> publishDisposer; | |
| private SingleSubject<Object> disposer; | |
| private Observable<Object> observableDisposer; | |
| private Completable completableDisposer; | |
| ProviderProxy(Object service) { | |
| this.service = Objects.requireNonNull(service); | |
| this.isDisposed = new AtomicBoolean(); | |
| this.disposer = SingleSubject.create(); | |
| this.publishDisposer = disposer.toFlowable(); | |
| this.observableDisposer = disposer.toObservable(); | |
| this.completableDisposer = disposer.ignoreElement(); | |
| } | |
| @Override | |
| public Object invoke(Object o, Method method, Object[] objects) throws Throwable { | |
| Class<?> returnType = method.getReturnType(); | |
| boolean isFlowable = Flowable.class.isAssignableFrom(returnType); | |
| boolean isObservable = Observable.class.isAssignableFrom(returnType); | |
| boolean isMaybe = Maybe.class.isAssignableFrom(returnType); | |
| boolean isSingle = Single.class.isAssignableFrom(returnType); | |
| boolean isCompletable = Completable.class.isAssignableFrom(returnType); | |
| if (isDisposed.get()) { | |
| // log error, that service call has come in, but service is already shut down | |
| if (isObservable) { | |
| return Observable.empty(); | |
| } else if (isFlowable) { | |
| return Flowable.empty(); | |
| } else if (isMaybe) { | |
| return Maybe.empty(); | |
| } else if (isSingle) { | |
| return Single.never(); | |
| } else if (isCompletable) { | |
| return Completable.never(); | |
| } | |
| } | |
| if (isObservable) { | |
| return ((Observable) method.invoke(service, objects)) | |
| .onTerminateDetach() | |
| .takeUntil(observableDisposer); | |
| } else if (isFlowable) { | |
| return ((Flowable) method.invoke(service, objects)) | |
| .onTerminateDetach() | |
| .takeUntil(publishDisposer); | |
| } else if (isMaybe) { | |
| return ((Maybe) method.invoke(service, objects)) | |
| .onTerminateDetach() | |
| .takeUntil(publishDisposer); | |
| } else if (isSingle) { | |
| return ((Single) method.invoke(service, objects)) | |
| .onTerminateDetach() | |
| .takeUntil(publishDisposer); | |
| } else if (isCompletable) { | |
| return ((Completable) method.invoke(service, objects)) | |
| .onTerminateDetach() | |
| .takeUntil(completableDisposer); | |
| } | |
| return null; | |
| } | |
| @Override | |
| public void dispose() { | |
| if (isDisposed.compareAndSet(false, true)) { | |
| disposer.onSuccess(NULL_ELEMENT); | |
| } | |
| } | |
| @Override | |
| public boolean isDisposed() { | |
| return isDisposed.get(); | |
| } | |
| } | |
| private final class TestClazz implements Dummy { | |
| private final Scheduler scheduler; | |
| private TestClazz(Scheduler scheduler) { | |
| this.scheduler = scheduler; | |
| } | |
| @Override | |
| public Observable<Long> value() { | |
| return Observable.interval(0, 1000, TimeUnit.MILLISECONDS, scheduler); | |
| } | |
| @Override | |
| public Single<Long> single() { | |
| return Single.timer(1000, TimeUnit.MILLISECONDS, scheduler); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment