Created
January 18, 2014 05:03
-
-
Save benjchristensen/8486461 to your computer and use it in GitHub Desktop.
Observable example with just the `bind` operator and unit tests asserting behavior while exploring the best signature.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx; | |
import static org.junit.Assert.*; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.junit.Test; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Action2; | |
import rx.util.functions.Func2; | |
public class ObsurvableBind<T> { | |
private final Action2<Observer<T>, OperatorSubscription> f; | |
ObsurvableBind(Action2<Observer<T>, OperatorSubscription> f) { | |
this.f = f; | |
} | |
public static <T> ObsurvableBind<T> create(final Action2<Observer<T>, OperatorSubscription> f) { | |
return new ObsurvableBind<T>(f); | |
} | |
public void observe(Observer<T> o, OperatorSubscription sf) { | |
f.call(o, sf); | |
} | |
public Subscription subscribe(Observer<T> o) { | |
final OperatorSubscription os = new OperatorSubscription(); | |
observe(o, os); | |
return os; | |
} | |
public static interface Operator<T> extends Observer<T> { | |
public OperatorSubscription getSubscription(); | |
} | |
public <R> ObsurvableBind<R> bind(final Func2<Observer<R>, OperatorSubscription, Operator<T>> bind) { | |
return new ObsurvableBind<R>(new Action2<Observer<R>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<R> o, final OperatorSubscription s) { | |
// subS -> take | |
Operator<T> ot = bind.call(o, s); | |
// takeS -> source | |
observe(ot, ot.getSubscription()); | |
} | |
}); | |
} | |
public static <T> ObsurvableBind<T> from(final T t) { | |
return ObsurvableBind.create(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<T> o, OperatorSubscription s) { | |
o.onNext(t); | |
o.onCompleted(); | |
} | |
}); | |
} | |
public static <T> ObsurvableBind<T> from(final Iterable<T> is) { | |
return ObsurvableBind.create(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<T> o, OperatorSubscription s) { | |
for (T i : is) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
} | |
/**************************************************************************************************************/ | |
private static class OperatorSubscription implements Subscription { | |
private final CompositeSubscription cs = new CompositeSubscription(); | |
@Override | |
public void unsubscribe() { | |
cs.unsubscribe(); | |
} | |
public static OperatorSubscription create() { | |
return new OperatorSubscription(); | |
} | |
public static OperatorSubscription create(Subscription s) { | |
OperatorSubscription _s = new OperatorSubscription(); | |
_s.add(s); | |
return _s; | |
} | |
public boolean isUnsubscribed() { | |
return cs.isUnsubscribed(); | |
} | |
public void add(Subscription s) { | |
cs.add(s); | |
} | |
} | |
/**************************************************************************************************************/ | |
/**************************************************************************************************************/ | |
public static class UnitTest { | |
@Test | |
public void testBindUnsubscribe() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func2<Observer<Integer>, OperatorSubscription, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Observer<Integer> childObserver, OperatorSubscription childSubscription) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// we return an Operator to the "parent" | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer t) { | |
childObserver.onNext(t); | |
parentSub.unsubscribe(); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
}).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
@Test | |
public void testBindUnsubscribeLayered() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
private static class DebugTakeFunction<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> { | |
final int num; | |
final String id; | |
DebugTakeFunction(final int num, final String id) { | |
this.num = num; | |
this.id = id; | |
} | |
int count = 0; | |
@Override | |
public Operator<T> call(final Observer<T> childObserver, final OperatorSubscription childSubscription) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them | |
childSubscription.add(parentSub); | |
// we return an Operator to the "parent" | |
return new Operator<T>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
count++; | |
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childSubscription.isUnsubscribed()); | |
if (!parentSub.isUnsubscribed()) { | |
childObserver.onNext(t); | |
if (count >= num) { | |
System.out.println("simpleTake[" + id + "] => unsubscribe"); | |
parentSub.unsubscribe(); | |
} | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
} | |
@Test | |
public void testBindUnsubscribeNested() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicInteger childCounter = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(4, "A")) | |
.bind(new DebugTakeFunction<Integer>(3, "B")) | |
.bind(new Func2<Observer<ObsurvableBind<String>>, OperatorSubscription, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Observer<ObsurvableBind<String>> childObserver, final OperatorSubscription childSubscription) { | |
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue | |
final OperatorSubscription outerSubscription = new OperatorSubscription(); | |
childSubscription.add(outerSubscription); | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we complete and will send nothing further | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer i) { | |
if (outerSubscription.isUnsubscribed()) { | |
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group"); | |
} else { | |
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i))); | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return outerSubscription; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<ObsurvableBind<String>>(2, "C")) | |
// flatten | |
.bind(new Func2<Observer<String>, OperatorSubscription, Operator<ObsurvableBind<String>>>() { | |
@Override | |
public Operator<ObsurvableBind<String>> call(final Observer<String> childObserver, final OperatorSubscription childSubscription) { | |
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription(); | |
final CompositeSubscription childGroups = new CompositeSubscription(); | |
// if child unsubscribes as we are giving it flattened results then we shut everything down | |
childSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** flattening got unsubscribed so shut down all groups"); | |
childGroups.unsubscribe(); | |
} | |
})); | |
return new Operator<ObsurvableBind<String>>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we don't immediately as sub-groups can still be emitting | |
} | |
@Override | |
public void onError(Throwable e) { | |
// if an error occurs we immediately shut everything down | |
childObserver.onError(e); | |
// unsubscribe all child groups | |
childGroups.unsubscribe(); | |
} | |
@Override | |
public void onNext(ObsurvableBind<String> o) { | |
// subscribe to this so we can flatten it | |
final OperatorSubscription childGroupSubscription = new OperatorSubscription(); | |
childGroups.add(childGroupSubscription); | |
// if all child groups are unsubscribed we want to unsubscribe the parent | |
childGroupSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** single group got unsubscribed"); | |
} | |
})); | |
o.observe(new Observer<String>() { | |
@Override | |
public void onCompleted() { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
// TODO we are ignoring when all child groups complete and parent is complete | |
} | |
@Override | |
public void onError(Throwable e) { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
} | |
@Override | |
public void onNext(String args) { | |
// emit flattened results | |
childObserver.onNext(args); | |
} | |
}, childGroupSubscription); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentGivingUsGroups; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<String>(7, "D")) | |
.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(2, counter.get()); | |
assertEquals(7, received.get()); | |
assertEquals(7, childCounter.get()); | |
} | |
ObsurvableBind<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Integer> o, OperatorSubscription s) { | |
int i = 1; | |
// System.out.println("source subscription: " + s); | |
while (!s.isUnsubscribed()) { | |
o.onNext(i++); | |
} | |
o.onCompleted(); | |
} | |
}); | |
ObsurvableBind<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { | |
return ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Integer> o, OperatorSubscription s) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); | |
ObsurvableBind<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) { | |
return ObsurvableBind.create(new Action2<Observer<String>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<String> o, OperatorSubscription s) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(v + "-" + i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) { | |
return ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(final Observer<Integer> o, final OperatorSubscription s) { | |
Thread t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("-------> subscribe to infinite sequence"); | |
System.out.println("Starting thread: " + Thread.currentThread()); | |
int i = 1; | |
while (!s.isUnsubscribed()) { | |
o.onNext(i++); | |
Thread.yield(); | |
} | |
o.onCompleted(); | |
latch.countDown(); | |
System.out.println("Ending thread: " + Thread.currentThread()); | |
} | |
}); | |
t.start(); | |
} | |
}); | |
} | |
} | |
public Subscription subscribe(final Action1<T> onNext) { | |
return subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(T t) { | |
onNext.call(t); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment