Skip to content

Instantly share code, notes, and snippets.

@evelyne24
Last active July 12, 2016 08:22
Show Gist options
  • Save evelyne24/0e0053f2c3a2c45c1336 to your computer and use it in GitHub Desktop.
Save evelyne24/0e0053f2c3a2c45c1336 to your computer and use it in GitHub Desktop.
`test` *bold*
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
/**
* Created by evelina on 26/06/2015.
*/
@RunWith(JUnit4.class)
public class TestSubjects {
@Test
public void testBehaviorSubjectWithCompletion() {
TestScheduler scheduler = new TestScheduler();
BehaviorSubject<Long> subject = BehaviorSubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
.take(3); // take(x) calls onComplete()
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives all 3 items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ONLY onComplete()
ts2.assertTerminalEvent();
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives ONLY onComplete()
ts3.assertTerminalEvent();
}
@Test
public void testBehaviorSubjectWithoutCompletion() {
TestScheduler scheduler = new TestScheduler();
BehaviorSubject<Long> subject = BehaviorSubject.create();
// this never calls onComplete() on the subscribers
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives all 3 items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ONLY last item
ts2.assertReceivedOnNext(Collections.singletonList(2L));
///////// NEW ITEM
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
/////////
// Subscriber receives the new item
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L));
// Subscriber receives the the new item
ts2.assertReceivedOnNext(Arrays.asList(2L, 3L));
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives the ONLY the LAST item transformed
ts3.assertReceivedOnNext(Collections.singletonList("str3"));
}
@Test
public void testReplaySubjectWithCompletion() {
TestScheduler scheduler = new TestScheduler();
ReplaySubject<Long> subject = ReplaySubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
.take(3); // take(x) calls onComplete()
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives ALL items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ALL items
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
///////// NEW ITEM
subject.onNext(3L);
/////////
// Subscriber DOES NOT receive new items after onComplete() has been called
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives ALL items transformed
ts3.assertReceivedOnNext(Arrays.asList("str0", "str1", "str2"));
}
@Test
public void testReplaySubjectWithoutCompletion() {
TestScheduler scheduler = new TestScheduler();
ReplaySubject<Long> subject = ReplaySubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives ALL items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ALL items
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
///////// NEW ITEM
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
/////////
// Subscriber receive the new value
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L));
// Subscriber receive the new value
ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L));
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives ALL values from the beginning -- this is different from BehaviorSubject
ts3.assertReceivedOnNext(Arrays.asList("str0", "str1", "str2", "str3"));
}
@Test
public void testAsyncSubjectWithCompletion() {
TestScheduler scheduler = new TestScheduler();
AsyncSubject<Long> subject = AsyncSubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
.take(3); // take(x) calls onComplete()
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives ONLY last item
ts1.assertReceivedOnNext(Collections.singletonList(2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ONLY last item
ts2.assertReceivedOnNext(Collections.singletonList(2L));
///////// NEW ITEM
subject.onNext(3L);
/////////
// Subscribers DON'T receive new items after completion
ts1.assertReceivedOnNext(Collections.singletonList(2L));
ts2.assertReceivedOnNext(Collections.singletonList(2L));
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives ONLY last item with transformation applied
ts3.assertReceivedOnNext(Collections.singletonList("str2"));
}
@Test
public void testAsyncSubjectWithoutCompletion() {
TestScheduler scheduler = new TestScheduler();
AsyncSubject<Long> subject = AsyncSubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
/////////
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
////////
// Subscriber receives NOTHING because onComplete has not been called
// ts1.assertTerminalEvent(); --> this fails
ts1.assertReceivedOnNext(Collections.<Long>emptyList());
subject.onCompleted();
///////// NEW ITEM
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
/////////
// Subscriber receives the LAST item BEFORE onComplete()
ts1.assertReceivedOnNext(Collections.singletonList(2L));
}
@Test
public void testPublishSubjectWithoutCompletion() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Long> subject = PublishSubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
// Subscriber receives all 3 items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives NOTHING
//ts2.assertTerminalEvent(); --> this fails
//ts2.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); --> this fails
ts2.assertReceivedOnNext(Collections.<Long>emptyList());
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives NOTHING
//ts3.assertTerminalEvent(); --> this fails
//ts3.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L)); --> this fails
ts3.assertReceivedOnNext(Collections.<String>emptyList());
}
@Test
public void testPublishSubjectWithCompletion() {
AtomicLong gen = new AtomicLong();
TestScheduler scheduler = new TestScheduler();
PublishSubject<Long> subject = PublishSubject.create();
Observable<Long> longObservable = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
.take(3); // take(x) calls onComplete()
longObservable.subscribe(subject);
TestSubscriber<Long> ts1 = new TestSubscriber<>();
subject.subscribe(ts1);
scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS);
// Subscriber receives all 3 items
ts1.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L));
TestSubscriber<Long> ts2 = new TestSubscriber<>();
subject.subscribe(ts2);
// Subscriber receives ONLY onComplete()
ts2.assertTerminalEvent();
Observable<String> stringObservable = subject.map(i -> "str" + i);
TestSubscriber<String> ts3 = new TestSubscriber<>();
stringObservable.subscribe(ts3);
// Subscriber receives ONLY onComplete()
ts3.assertTerminalEvent();
}
}
@evelyne24
Copy link
Author

Simple tests I've written to understand how different subjects (BehaviorSubject, AsyncSubject, PublishSubject and ReplySubject) work when the observable under test calls or doesn't call onComplete() on the subscribers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment