Created
March 30, 2015 13:06
-
-
Save YannRobert/a07ed294dbaccd4c3705 to your computer and use it in GitHub Desktop.
Does the defensive code in RxJava "merge" operator could be improved?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.extern.slf4j.Slf4j; | |
import org.junit.Test; | |
import org.mockito.invocation.InvocationOnMock; | |
import org.mockito.stubbing.Answer; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.Subject; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import static org.junit.Assert.*; | |
import static org.mockito.Matchers.anyInt; | |
import static org.mockito.Mockito.doNothing; | |
import static org.mockito.Mockito.mock; | |
/** | |
* either there is a issue in the defensive code in the <code>merge</code> operator or RxJava, either the support for mock Subject could be improved? | |
*/ | |
@Slf4j | |
public class IssueInRxJavaDefensiveCodeTest { | |
@Test | |
public void shouldAcceptSubscriber() throws InterruptedException { | |
Subject<Integer, Integer> subject1 = PublishSubject.create(); | |
CountDownLatch latch = new CountDownLatch(1); | |
subject1.subscribe(new SimpleLoggingObserver<>(latch)); | |
subject1.onNext(1); | |
subject1.onCompleted(); | |
assertTrue(latch.await(3, TimeUnit.SECONDS)); | |
} | |
/** | |
* Here we get an IllegalStateException when subscribing to a mock Subject | |
*/ | |
@Test | |
public void shouldThrowIllegalStateExceptionOnMockSubjectSubscribe() throws InterruptedException { | |
Subject<Integer, Integer> subject2 = mock(Subject.class, new Answer<Object>() { | |
@Override | |
public Object answer(InvocationOnMock invocationOnMock) throws Throwable { | |
log.info("Calling Method {} ...", invocationOnMock.getMethod().getName()); | |
throw new UnsupportedOperationException(invocationOnMock.getMethod().getName()); | |
} | |
}); | |
doNothing().when(subject2).onNext(anyInt()); | |
doNothing().when(subject2).onCompleted(); | |
CountDownLatch latch = new CountDownLatch(1); | |
try { | |
subject2.subscribe(new SimpleLoggingObserver<>(latch)); | |
fail(); | |
} catch (IllegalStateException e) { | |
assertEquals("onSubscribe function can not be null.", e.getMessage()); | |
} | |
subject2.onNext(1); | |
subject2.onCompleted(); | |
assertFalse(latch.await(3, TimeUnit.SECONDS)); | |
} | |
/** | |
* when subscribing to Subject merging a mock Subject, a NullPointerException occurs under the hood, and no error is reported to the Observer. | |
*/ | |
@Test | |
public void shouldThrowIllegalStateExceptionOnMockSubjectSubscribeByMerge() throws InterruptedException { | |
Subject<Integer, Integer> subject1 = PublishSubject.create(); | |
Subject<Integer, Integer> subject2 = mock(Subject.class, new Answer<Object>() { | |
@Override | |
public Object answer(InvocationOnMock invocationOnMock) throws Throwable { | |
log.info("Calling Method {} ...", invocationOnMock.getMethod().getName()); | |
throw new UnsupportedOperationException(invocationOnMock.getMethod().getName()); | |
} | |
}); | |
doNothing().when(subject2).onNext(anyInt()); | |
doNothing().when(subject2).onCompleted(); | |
CountDownLatch latch = new CountDownLatch(1); | |
Observable.merge(subject1, subject2).subscribe(new SimpleLoggingObserver<>(latch)); | |
subject1.onNext(1); | |
subject1.onCompleted(); | |
subject2.onNext(1); | |
subject2.onCompleted(); | |
assertFalse(latch.await(3, TimeUnit.SECONDS)); | |
} | |
private static class SimpleLoggingObserver<T> implements Observer<T> { | |
private final CountDownLatch latch; | |
public SimpleLoggingObserver(CountDownLatch latch) { | |
this.latch = latch; | |
} | |
@Override | |
public void onCompleted() { | |
log.info("Completed"); | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
log.info("Error", e); | |
latch.countDown(); | |
} | |
@Override | |
public void onNext(T value) { | |
log.info("next = {}", value); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
see issue ReactiveX/RxJava#2848