Last active
July 12, 2016 08:22
-
-
Save evelyne24/0e0053f2c3a2c45c1336 to your computer and use it in GitHub Desktop.
`test` *bold*
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import rx.Observable; | |
import rx.observers.TestSubscriber; | |
import rx.schedulers.TestScheduler; | |
import rx.subjects.AsyncSubject; | |
import rx.subjects.BehaviorSubject; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.ReplaySubject; | |
/** | |
* Created by evelina on 26/06/2015. | |
*/ | |
@RunWith(JUnit4.class) | |
public class TestSubjects { | |
@Test | |
public void testBehaviorSubjectWithCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
BehaviorSubject<Long> subject = BehaviorSubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler) | |
.take(3); // take(x) calls onComplete() | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives all 3 items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ONLY onComplete() | |
ts2.assertTerminalEvent(); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives ONLY onComplete() | |
ts3.assertTerminalEvent(); | |
} | |
@Test | |
public void testBehaviorSubjectWithoutCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
BehaviorSubject<Long> subject = BehaviorSubject.create(); | |
// this never calls onComplete() on the subscribers | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives all 3 items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ONLY last item | |
ts2.assertReceivedOnNext(Collections.singletonList(2L)); | |
///////// NEW ITEM | |
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); | |
///////// | |
// Subscriber receives the new item | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L)); | |
// Subscriber receives the the new item | |
ts2.assertReceivedOnNext(Arrays.asList(2L, 3L)); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives the ONLY the LAST item transformed | |
ts3.assertReceivedOnNext(Collections.singletonList("str3")); | |
} | |
@Test | |
public void testReplaySubjectWithCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
ReplaySubject<Long> subject = ReplaySubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler) | |
.take(3); // take(x) calls onComplete() | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives ALL items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ALL items | |
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
///////// NEW ITEM | |
subject.onNext(3L); | |
///////// | |
// Subscriber DOES NOT receive new items after onComplete() has been called | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives ALL items transformed | |
ts3.assertReceivedOnNext(Arrays.asList("str0", "str1", "str2")); | |
} | |
@Test | |
public void testReplaySubjectWithoutCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
ReplaySubject<Long> subject = ReplaySubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives ALL items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ALL items | |
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
///////// NEW ITEM | |
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); | |
///////// | |
// Subscriber receive the new value | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L)); | |
// Subscriber receive the new value | |
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L)); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives ALL values from the beginning -- this is different from BehaviorSubject | |
ts3.assertReceivedOnNext(Arrays.asList("str0", "str1", "str2", "str3")); | |
} | |
@Test | |
public void testAsyncSubjectWithCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
AsyncSubject<Long> subject = AsyncSubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler) | |
.take(3); // take(x) calls onComplete() | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives ONLY last item | |
ts1.assertReceivedOnNext(Collections.singletonList(2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ONLY last item | |
ts2.assertReceivedOnNext(Collections.singletonList(2L)); | |
///////// NEW ITEM | |
subject.onNext(3L); | |
///////// | |
// Subscribers DON'T receive new items after completion | |
ts1.assertReceivedOnNext(Collections.singletonList(2L)); | |
ts2.assertReceivedOnNext(Collections.singletonList(2L)); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives ONLY last item with transformation applied | |
ts3.assertReceivedOnNext(Collections.singletonList("str2")); | |
} | |
@Test | |
public void testAsyncSubjectWithoutCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
AsyncSubject<Long> subject = AsyncSubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
///////// | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
//////// | |
// Subscriber receives NOTHING because onComplete has not been called | |
// ts1.assertTerminalEvent(); --> this fails | |
ts1.assertReceivedOnNext(Collections.<Long>emptyList()); | |
subject.onCompleted(); | |
///////// NEW ITEM | |
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); | |
///////// | |
// Subscriber receives the LAST item BEFORE onComplete() | |
ts1.assertReceivedOnNext(Collections.singletonList(2L)); | |
} | |
@Test | |
public void testPublishSubjectWithoutCompletion() { | |
TestScheduler scheduler = new TestScheduler(); | |
PublishSubject<Long> subject = PublishSubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
// Subscriber receives all 3 items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives NOTHING | |
//ts2.assertTerminalEvent(); --> this fails | |
//ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); --> this fails | |
ts2.assertReceivedOnNext(Collections.<Long>emptyList()); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives NOTHING | |
//ts3.assertTerminalEvent(); --> this fails | |
//ts3.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); --> this fails | |
ts3.assertReceivedOnNext(Collections.<String>emptyList()); | |
} | |
@Test | |
public void testPublishSubjectWithCompletion() { | |
AtomicLong gen = new AtomicLong(); | |
TestScheduler scheduler = new TestScheduler(); | |
PublishSubject<Long> subject = PublishSubject.create(); | |
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler) | |
.take(3); // take(x) calls onComplete() | |
longObservable.subscribe(subject); | |
TestSubscriber<Long> ts1 = new TestSubscriber<>(); | |
subject.subscribe(ts1); | |
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); | |
// Subscriber receives all 3 items | |
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); | |
TestSubscriber<Long> ts2 = new TestSubscriber<>(); | |
subject.subscribe(ts2); | |
// Subscriber receives ONLY onComplete() | |
ts2.assertTerminalEvent(); | |
Observable<String> stringObservable = subject.map(i -> "str" + i); | |
TestSubscriber<String> ts3 = new TestSubscriber<>(); | |
stringObservable.subscribe(ts3); | |
// Subscriber receives ONLY onComplete() | |
ts3.assertTerminalEvent(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Simple tests I've written to understand how different subjects (BehaviorSubject, AsyncSubject, PublishSubject and ReplySubject) work when the observable under test calls or doesn't call onComplete() on the subscribers.