Created
March 20, 2013 14:58
-
-
Save abliss/5205333 to your computer and use it in GitHub Desktop.
A failing test.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java | |
index c621002..b7c9fac 100644 | |
--- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java | |
+++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java | |
@@ -19,6 +19,7 @@ import static org.junit.Assert.*; | |
import static org.mockito.Mockito.*; | |
import java.lang.reflect.Array; | |
+import java.util.Arrays; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
@@ -193,8 +194,8 @@ public final class OperationConcat { | |
public void testConcatUnsubscribe() { | |
final CountDownLatch callOnce = new CountDownLatch(1); | |
final CountDownLatch okToContinue = new CountDownLatch(1); | |
- final TestObservable w1 = new TestObservable(null, null, "one", "two", "three"); | |
- final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six"); | |
+ final TestObservable<String> w1 = new TestObservable<String>(null, null, "one", "two", "three"); | |
+ final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six"); | |
@SuppressWarnings("unchecked") | |
Observer<String> aObserver = mock(Observer.class); | |
@@ -256,7 +257,40 @@ public final class OperationConcat { | |
Assert.assertEquals(expected.length, index); | |
} | |
- private static class TestObservable extends Observable<String> { | |
+ @Test | |
+ public void testBlockedObservableOfObservables() { | |
+ final String[] o = { "1", "3", "5", "7" }; | |
+ final String[] e = { "2", "4", "6" }; | |
+ final Observable<String> odds = Observable.toObservable(o); | |
+ final Observable<String> even = Observable.toObservable(e); | |
+ final CountDownLatch callOnce = new CountDownLatch(1); | |
+ final CountDownLatch okToContinue = new CountDownLatch(1); | |
+ TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even); | |
+ Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables); | |
+ Observable<String> concat = Observable.create(concatF); | |
+ concat.subscribe(observer); | |
+ try { | |
+ //Block main thread to allow observables to serve up o1. | |
+ callOnce.await(); | |
+ } catch (Exception ex) { | |
+ ex.printStackTrace(); | |
+ fail(ex.getMessage()); | |
+ } | |
+ // The concated observable should have served up all of the odds. | |
+ Assert.assertEquals(o.length, index); | |
+ try { | |
+ // unblock observables so it can serve up o2 and complete | |
+ okToContinue.countDown(); | |
+ observableOfObservables.t.join(); | |
+ } catch (Exception ex) { | |
+ ex.printStackTrace(); | |
+ fail(ex.getMessage()); | |
+ } | |
+ // The concatenated observable should now have served up all the evens. | |
+ Assert.assertEquals(o.length, index); | |
+ } | |
+ | |
+ private static class TestObservable<T> extends Observable<T> { | |
private final Subscription s = new Subscription() { | |
@@ -266,28 +300,28 @@ public final class OperationConcat { | |
} | |
}; | |
- private final String[] values; | |
+ private final List<T> values; | |
private Thread t = null; | |
private int count = 0; | |
private boolean subscribed = true; | |
private final CountDownLatch once; | |
private final CountDownLatch okToContinue; | |
- public TestObservable(CountDownLatch once, CountDownLatch okToContinue, String... values) { | |
- this.values = values; | |
+ public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) { | |
+ this.values = Arrays.asList(values); | |
this.once = once; | |
this.okToContinue = okToContinue; | |
} | |
@Override | |
- public Subscription subscribe(final Observer<String> observer) { | |
+ public Subscription subscribe(final Observer<T> observer) { | |
t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
- while (count < values.length && subscribed) { | |
- observer.onNext(values[count]); | |
+ while (count < values.size() && subscribed) { | |
+ observer.onNext(values.get(count)); | |
count++; | |
//Unblock the main thread to call unsubscribe. | |
if (null != once) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment