Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Created April 15, 2018 10:35
Show Gist options
  • Save SergejIsbrecht/fec7ebe80a6c31f56eaae89683e8d057 to your computer and use it in GitHub Desktop.
Save SergejIsbrecht/fec7ebe80a6c31f56eaae89683e8d057 to your computer and use it in GitHub Desktop.
@Test
public void connect() {
PublishSubject<Integer> values = PublishSubject.create();
ConnectableObservable<Integer> i$ = values.hide().mergeWith(Observable.never()).replay(1);
Disposable d = i$.connect();
TestObserver<Integer> test1$ = i$.test();
values.onNext(4);
TestObserver<Integer> test2$ = i$.test();
List<Integer> values1 = test1$.assertValueCount(1).values();
List<Integer> values2 = test2$.assertValueCount(1).values();
assertThat(values1).contains(4);
assertThat(values2).contains(4);
test1$.dispose();
test2$.dispose();
// open new subscription -> subscription from #connect is still open
TestObserver<Integer> test3$ = i$.test();
List<Integer> values3 = test3$.assertValueCount(1).values();
assertThat(values3).contains(4);
test3$.dispose(); // all subscriptions gone
d.dispose(); // stop emitting values to subscriber
// multicasted subscription is closed -> cached values collected -> Observable will be "re-started".
TestObserver<Integer> test4$ = i$.test().assertValueCount(1);
assertThat(test4$.values()).contains(4);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment