Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Last active July 15, 2019 06:02
Show Gist options
  • Select an option

  • Save SergejIsbrecht/49bc79649958c98e54829bfb3110569a to your computer and use it in GitHub Desktop.

Select an option

Save SergejIsbrecht/49bc79649958c98e54829bfb3110569a to your computer and use it in GitHub Desktop.
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.SingleSubject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
class ObservableChain {
private Dummy dummy;
private TestScheduler testScheduler;
private ProviderProxy providerProxy;
@BeforeEach
void setUp() {
testScheduler = new TestScheduler();
TestClazz service = new TestClazz(testScheduler);
providerProxy = new ProviderProxy(service);
dummy =
(Dummy)
Proxy.newProxyInstance(
ObservableChain.class.getClassLoader(), new Class[] {Dummy.class}, providerProxy);
}
@Test
void nameX() {
TestObserver<Long> test = dummy.value().test();
testScheduler.advanceTimeBy(2001, TimeUnit.MILLISECONDS);
providerProxy.dispose();
testScheduler.advanceTimeBy(2001, TimeUnit.MILLISECONDS);
test.assertComplete().assertNoErrors().assertValues(0L, 1L, 2L);
dummy.value().test().assertComplete();
}
@Test
void nameY() {
providerProxy.dispose();
dummy.value().test().assertComplete().assertNoErrors().assertNoValues();
}
@Test
void nameZ() {
TestObserver<Long> single = dummy.single().test();
providerProxy.dispose();
testScheduler.advanceTimeBy(1, TimeUnit.DAYS);
single.assertNoValues().assertError(CancellationException.class);
}
private interface Dummy {
Observable<Long> value();
Single<Long> single();
}
private static class ProviderProxy implements InvocationHandler, Disposable {
private static final Object NULL_ELEMENT = new Object();
private final AtomicBoolean isDisposed;
private final Object service;
private Publisher<Object> publishDisposer;
private SingleSubject<Object> disposer;
private Observable<Object> observableDisposer;
private Completable completableDisposer;
ProviderProxy(Object service) {
this.service = Objects.requireNonNull(service);
this.isDisposed = new AtomicBoolean();
this.disposer = SingleSubject.create();
this.publishDisposer = disposer.toFlowable();
this.observableDisposer = disposer.toObservable();
this.completableDisposer = disposer.ignoreElement();
}
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
Class<?> returnType = method.getReturnType();
boolean isFlowable = Flowable.class.isAssignableFrom(returnType);
boolean isObservable = Observable.class.isAssignableFrom(returnType);
boolean isMaybe = Maybe.class.isAssignableFrom(returnType);
boolean isSingle = Single.class.isAssignableFrom(returnType);
boolean isCompletable = Completable.class.isAssignableFrom(returnType);
if (isDisposed.get()) {
// log error, that service call has come in, but service is already shut down
if (isObservable) {
return Observable.empty();
} else if (isFlowable) {
return Flowable.empty();
} else if (isMaybe) {
return Maybe.empty();
} else if (isSingle) {
return Single.never();
} else if (isCompletable) {
return Completable.never();
}
}
if (isObservable) {
return ((Observable) method.invoke(service, objects))
.onTerminateDetach()
.takeUntil(observableDisposer);
} else if (isFlowable) {
return ((Flowable) method.invoke(service, objects))
.onTerminateDetach()
.takeUntil(publishDisposer);
} else if (isMaybe) {
return ((Maybe) method.invoke(service, objects))
.onTerminateDetach()
.takeUntil(publishDisposer);
} else if (isSingle) {
return ((Single) method.invoke(service, objects))
.onTerminateDetach()
.takeUntil(publishDisposer);
} else if (isCompletable) {
return ((Completable) method.invoke(service, objects))
.onTerminateDetach()
.takeUntil(completableDisposer);
}
return null;
}
@Override
public void dispose() {
if (isDisposed.compareAndSet(false, true)) {
disposer.onSuccess(NULL_ELEMENT);
}
}
@Override
public boolean isDisposed() {
return isDisposed.get();
}
}
private final class TestClazz implements Dummy {
private final Scheduler scheduler;
private TestClazz(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Observable<Long> value() {
return Observable.interval(0, 1000, TimeUnit.MILLISECONDS, scheduler);
}
@Override
public Single<Long> single() {
return Single.timer(1000, TimeUnit.MILLISECONDS, scheduler);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment