Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Created January 18, 2014 05:03
Show Gist options
  • Save benjchristensen/8486461 to your computer and use it in GitHub Desktop.
Save benjchristensen/8486461 to your computer and use it in GitHub Desktop.
Observable example with just the `bind` operator and unit tests asserting behavior while exploring the best signature.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Action2;
import rx.util.functions.Func2;
public class ObsurvableBind<T> {
private final Action2<Observer<T>, OperatorSubscription> f;
ObsurvableBind(Action2<Observer<T>, OperatorSubscription> f) {
this.f = f;
}
public static <T> ObsurvableBind<T> create(final Action2<Observer<T>, OperatorSubscription> f) {
return new ObsurvableBind<T>(f);
}
public void observe(Observer<T> o, OperatorSubscription sf) {
f.call(o, sf);
}
public Subscription subscribe(Observer<T> o) {
final OperatorSubscription os = new OperatorSubscription();
observe(o, os);
return os;
}
public static interface Operator<T> extends Observer<T> {
public OperatorSubscription getSubscription();
}
public <R> ObsurvableBind<R> bind(final Func2<Observer<R>, OperatorSubscription, Operator<T>> bind) {
return new ObsurvableBind<R>(new Action2<Observer<R>, OperatorSubscription>() {
@Override
public void call(Observer<R> o, final OperatorSubscription s) {
// subS -> take
Operator<T> ot = bind.call(o, s);
// takeS -> source
observe(ot, ot.getSubscription());
}
});
}
public static <T> ObsurvableBind<T> from(final T t) {
return ObsurvableBind.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
o.onNext(t);
o.onCompleted();
}
});
}
public static <T> ObsurvableBind<T> from(final Iterable<T> is) {
return ObsurvableBind.create(new Action2<Observer<T>, OperatorSubscription>() {
@Override
public void call(Observer<T> o, OperatorSubscription s) {
for (T i : is) {
if (s.isUnsubscribed()) {
break;
}
o.onNext(i);
}
o.onCompleted();
}
});
}
/**************************************************************************************************************/
private static class OperatorSubscription implements Subscription {
private final CompositeSubscription cs = new CompositeSubscription();
@Override
public void unsubscribe() {
cs.unsubscribe();
}
public static OperatorSubscription create() {
return new OperatorSubscription();
}
public static OperatorSubscription create(Subscription s) {
OperatorSubscription _s = new OperatorSubscription();
_s.add(s);
return _s;
}
public boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
public void add(Subscription s) {
cs.add(s);
}
}
/**************************************************************************************************************/
/**************************************************************************************************************/
public static class UnitTest {
@Test
public void testBindUnsubscribe() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func2<Observer<Integer>, OperatorSubscription, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Observer<Integer> childObserver, OperatorSubscription childSubscription) {
final OperatorSubscription parentSub = new OperatorSubscription();
// we return an Operator to the "parent"
return new Operator<Integer>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer t) {
childObserver.onNext(t);
parentSub.unsubscribe();
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
@Test
public void testBindUnsubscribeLayered() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
private static class DebugTakeFunction<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> {
final int num;
final String id;
DebugTakeFunction(final int num, final String id) {
this.num = num;
this.id = id;
}
int count = 0;
@Override
public Operator<T> call(final Observer<T> childObserver, final OperatorSubscription childSubscription) {
final OperatorSubscription parentSub = new OperatorSubscription();
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them
childSubscription.add(parentSub);
// we return an Operator to the "parent"
return new Operator<T>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(T t) {
count++;
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childSubscription.isUnsubscribed());
if (!parentSub.isUnsubscribed()) {
childObserver.onNext(t);
if (count >= num) {
System.out.println("simpleTake[" + id + "] => unsubscribe");
parentSub.unsubscribe();
}
}
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}
@Test
public void testBindUnsubscribeNested() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
final AtomicInteger childCounter = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(4, "A"))
.bind(new DebugTakeFunction<Integer>(3, "B"))
.bind(new Func2<Observer<ObsurvableBind<String>>, OperatorSubscription, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Observer<ObsurvableBind<String>> childObserver, final OperatorSubscription childSubscription) {
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue
final OperatorSubscription outerSubscription = new OperatorSubscription();
childSubscription.add(outerSubscription);
return new Operator<Integer>() {
@Override
public void onCompleted() {
// if the parent completes we complete and will send nothing further
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer i) {
if (outerSubscription.isUnsubscribed()) {
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group");
} else {
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i)));
}
}
@Override
public OperatorSubscription getSubscription() {
return outerSubscription;
}
};
}
})
.bind(new DebugTakeFunction<ObsurvableBind<String>>(2, "C"))
// flatten
.bind(new Func2<Observer<String>, OperatorSubscription, Operator<ObsurvableBind<String>>>() {
@Override
public Operator<ObsurvableBind<String>> call(final Observer<String> childObserver, final OperatorSubscription childSubscription) {
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription();
final CompositeSubscription childGroups = new CompositeSubscription();
// if child unsubscribes as we are giving it flattened results then we shut everything down
childSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** flattening got unsubscribed so shut down all groups");
childGroups.unsubscribe();
}
}));
return new Operator<ObsurvableBind<String>>() {
@Override
public void onCompleted() {
// if the parent completes we don't immediately as sub-groups can still be emitting
}
@Override
public void onError(Throwable e) {
// if an error occurs we immediately shut everything down
childObserver.onError(e);
// unsubscribe all child groups
childGroups.unsubscribe();
}
@Override
public void onNext(ObsurvableBind<String> o) {
// subscribe to this so we can flatten it
final OperatorSubscription childGroupSubscription = new OperatorSubscription();
childGroups.add(childGroupSubscription);
// if all child groups are unsubscribed we want to unsubscribe the parent
childGroupSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** single group got unsubscribed");
}
}));
o.observe(new Observer<String>() {
@Override
public void onCompleted() {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
// TODO we are ignoring when all child groups complete and parent is complete
}
@Override
public void onError(Throwable e) {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
}
@Override
public void onNext(String args) {
// emit flattened results
childObserver.onNext(args);
}
}, childGroupSubscription);
}
@Override
public OperatorSubscription getSubscription() {
return parentGivingUsGroups;
}
};
}
})
.bind(new DebugTakeFunction<String>(7, "D"))
.subscribe(new Action1<String>() {
@Override
public void call(String i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(2, counter.get());
assertEquals(7, received.get());
assertEquals(7, childCounter.get());
}
ObsurvableBind<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
int i = 1;
// System.out.println("source subscription: " + s);
while (!s.isUnsubscribed()) {
o.onNext(i++);
}
o.onCompleted();
}
});
ObsurvableBind<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {
return ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(Observer<Integer> o, OperatorSubscription s) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(i);
}
o.onCompleted();
}
});
};
ObsurvableBind<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());
ObsurvableBind<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) {
return ObsurvableBind.create(new Action2<Observer<String>, OperatorSubscription>() {
@Override
public void call(Observer<String> o, OperatorSubscription s) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (s.isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(v + "-" + i);
}
o.onCompleted();
}
});
};
ObsurvableBind<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) {
return ObsurvableBind.create(new Action2<Observer<Integer>, OperatorSubscription>() {
@Override
public void call(final Observer<Integer> o, final OperatorSubscription s) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("-------> subscribe to infinite sequence");
System.out.println("Starting thread: " + Thread.currentThread());
int i = 1;
while (!s.isUnsubscribed()) {
o.onNext(i++);
Thread.yield();
}
o.onCompleted();
latch.countDown();
System.out.println("Ending thread: " + Thread.currentThread());
}
});
t.start();
}
});
}
}
public Subscription subscribe(final Action1<T> onNext) {
return subscribe(new Observer<T>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(T t) {
onNext.call(t);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment