Last active
January 31, 2017 09:45
-
-
Save benjchristensen/8367765 to your computer and use it in GitHub Desktop.
Playing with Observable.bind
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx; | |
import static org.junit.Assert.*; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.junit.Test; | |
import rx.operators.SynchronizedObserver; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Action2; | |
import rx.util.functions.Func1; | |
import rx.util.functions.Func2; | |
import rx.util.functions.Func3; | |
import rx.util.functions.FuncN; | |
public class Obsurvable<T> { | |
private final Action2<Observer<T>, OperatorSubscription> f; | |
Obsurvable(Action2<Observer<T>, OperatorSubscription> f) { | |
this.f = f; | |
} | |
public static <T> Obsurvable<T> create(final Action2<Observer<T>, OperatorSubscription> f) { | |
return new Obsurvable<T>(f); | |
} | |
public Subscription subscribe(Observer<T> o) { | |
final OperatorSubscription os = new OperatorSubscription(); | |
// System.out.println("$$ subscribe subscription: " + os); | |
observe(o, os); | |
return os; | |
} | |
public void observe(Observer<T> o, OperatorSubscription sf) { | |
f.call(o, sf); | |
} | |
public static interface Operator<T> extends Observer<T> { | |
public OperatorSubscription getSubscription(); | |
} | |
public <R> Obsurvable<R> bind(final Func2<Observer<R>, OperatorSubscription, Operator<T>> bind) { | |
return new Obsurvable<R>(new Action2<Observer<R>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<R> o, final OperatorSubscription s) { | |
// subS -> take | |
Operator<T> ot = bind.call(o, s); | |
// takeS -> source | |
observe(ot, ot.getSubscription()); | |
} | |
}); | |
} | |
public <R> Obsurvable<R> map(Func1<T, R> f) { | |
return bind(new MapOperator<T, R>(f)); | |
} | |
public <R> Obsurvable<R> flatMap(Func1<T, Obsurvable<R>> f) { | |
return bind(new MapOperator<T, Obsurvable<R>>(f)).bind(new MergeOperator<R>()); | |
} | |
public Obsurvable<T> take(int num) { | |
return bind(new TakeOperator<T>(num)); | |
} | |
public final <K> Obsurvable<Obsurvable<T>> groupBy(final Func1<T, K> keySelector) { | |
return bind(new OperatorGroupBy<K, T>(keySelector)); | |
} | |
public <T2, R> Obsurvable<R> zip(final Obsurvable<T2> other, final Func2<T, T2, R> zipFunction) { | |
return zip(this, other, zipFunction); | |
} | |
@SuppressWarnings("unchecked") | |
public static <T1, T2, R> Obsurvable<R> zip(final Obsurvable<T1> t1, final Obsurvable<T2> t2, final Func2<T1, T2, R> zipFunction) { | |
return from(new Obsurvable[] { t1, t2 }).bind(new ZipOperator<R>(new FuncN<R>() { | |
@Override | |
public R call(Object... args) { | |
return zipFunction.call((T1) args[0], (T2) args[1]); | |
} | |
})); | |
} | |
@SuppressWarnings("unchecked") | |
public static <T1, T2, T3, R> Obsurvable<R> zip(final Obsurvable<T1> t1, final Obsurvable<T2> t2, final Obsurvable<T3> t3, final Func3<T1, T2, T3, R> zipFunction) { | |
return from(new Obsurvable[] { t1, t2, t3 }).bind(new ZipOperator<R>(new FuncN<R>() { | |
@Override | |
public R call(Object... args) { | |
return zipFunction.call((T1) args[0], (T2) args[1], (T3) args[2]); | |
} | |
})); | |
} | |
public Obsurvable<T> repeat() { | |
return from(this).bind(new RepeatOperator<T>()); | |
} | |
public static <T> Obsurvable<T> from(final T t) { | |
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<T> o, OperatorSubscription s) { | |
o.onNext(t); | |
o.onCompleted(); | |
} | |
}); | |
} | |
public static <T> Obsurvable<T> from(final Iterable<T> is) { | |
return Obsurvable.create(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<T> o, OperatorSubscription s) { | |
for (T i : is) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
} | |
/**************************************************************************************************************/ | |
private static class OperatorSubscription implements Subscription { | |
private final CompositeSubscription cs = new CompositeSubscription(); | |
@Override | |
public void unsubscribe() { | |
cs.unsubscribe(); | |
} | |
public static OperatorSubscription create() { | |
return new OperatorSubscription(); | |
} | |
public static OperatorSubscription create(Subscription s) { | |
OperatorSubscription _s = new OperatorSubscription(); | |
_s.add(s); | |
return _s; | |
} | |
public boolean isUnsubscribed() { | |
return cs.isUnsubscribed(); | |
} | |
public void add(Subscription s) { | |
cs.add(s); | |
} | |
} | |
/**************************************************************************************************************/ | |
private final static class OperatorGroupBy<K, T> implements Func2<Observer<Obsurvable<T>>, OperatorSubscription, Operator<T>> { | |
final Func1<? super T, ? extends K> keySelector; | |
public OperatorGroupBy(final Func1<T, K> keySelector) { | |
this.keySelector = keySelector; | |
} | |
@Override | |
public Operator<T> call(final Observer<Obsurvable<T>> childObserver, final OperatorSubscription childSubscription) { | |
// when child unsubscribes we want to prevent further groups from being emitted but still allow existing groups | |
childSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("********* groupBy => outer subscription unsubscribed"); | |
} | |
})); | |
// TODO what do we do if the parentSubscription unsubscribes? | |
final OperatorSubscription parentSubscription = new OperatorSubscription(); | |
parentSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("********* groupBy => parentSubscription unsubscribed"); | |
} | |
})); | |
return new Operator<T>() { | |
private final Map<K, SimpleSubject<T>> groups = new HashMap<K, SimpleSubject<T>>(); | |
private final AtomicInteger completionCounter = new AtomicInteger(1); | |
@Override | |
public void onCompleted() { | |
// if we receive onCompleted from our parent we onComplete everything | |
for (SimpleSubject<T> ps : groups.values()) { | |
ps.onCompleted(); | |
} | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
// we immediately tear everything down if we receive an error | |
childSubscription.unsubscribe(); // TODO is this correct? or onComplete? we don't want to propagate the error to each child | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
// System.out.println("received onNext: " + t); | |
try { | |
final K key = keySelector.call(t); | |
SimpleSubject<T> gps = groups.get(key); | |
if (gps == null) { | |
// this group doesn't exist | |
if (childSubscription.isUnsubscribed()) { | |
// we have been unsubscribed on the outer so won't send any more groups | |
// System.out.println("skipping group due to outer unsubscribe: " + key); | |
return; | |
} | |
gps = new SimpleSubject<T>(); | |
final SimpleSubject<T> _gps = gps; | |
System.out.println("new group for key: " + key); | |
final OperatorSubscription childGroupSubscription = new OperatorSubscription(); | |
childGroupSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("********* groupBy => child group unsubscribed"); | |
} | |
})); | |
Obsurvable<T> go = new Obsurvable<T>(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(final Observer<T> childObserver, final OperatorSubscription _child) { | |
_gps.observe(childObserver, childGroupSubscription); | |
} | |
}); | |
groups.put(key, gps); | |
// System.out.println("send onNext: " + go + " to " + key); | |
// number of children we have running | |
completionCounter.incrementAndGet(); | |
childObserver.onNext(go); | |
} | |
// we have the correct group so send value to it | |
gps.onNext(t); | |
} catch (Throwable e) { | |
onError(e); | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSubscription; | |
} | |
private void complete() { | |
if (completionCounter.decrementAndGet() == 0) { | |
childObserver.onCompleted(); | |
} | |
} | |
}; | |
} | |
} | |
private static class SimpleSubject<T> extends Obsurvable<T> implements Observer<T> { | |
final AtomicReference<Observer<T>> theObserver; | |
final AtomicReference<OperatorSubscription> theSubscription; | |
public SimpleSubject(final AtomicReference<Observer<T>> theObserver, final AtomicReference<OperatorSubscription> theSubscription) { | |
super(new Action2<Observer<T>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<T> o, OperatorSubscription s) { | |
theObserver.set(o); | |
} | |
}); | |
this.theObserver = theObserver; | |
this.theSubscription = theSubscription; | |
} | |
public SimpleSubject() { | |
this(new AtomicReference<Observer<T>>(), new AtomicReference<OperatorSubscription>()); | |
} | |
@Override | |
public void onCompleted() { | |
theObserver.get().onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
theObserver.get().onError(e); | |
} | |
@Override | |
public void onNext(T args) { | |
theObserver.get().onNext(args); | |
} | |
} | |
public static class MergeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<Obsurvable<T>>> { | |
private final int maxConcurrent; | |
public MergeOperator() { | |
maxConcurrent = Integer.MAX_VALUE; | |
} | |
public MergeOperator(int maxConcurrent) { | |
if (maxConcurrent <= 0) { | |
throw new IllegalArgumentException("maxConcurrent must be positive"); | |
} | |
this.maxConcurrent = maxConcurrent; | |
} | |
@Override | |
public Operator<Obsurvable<T>> call(Observer<T> _o, final OperatorSubscription os) { | |
final AtomicInteger completionCounter = new AtomicInteger(1); | |
final AtomicInteger concurrentCounter = new AtomicInteger(1); | |
// Concurrent* since we'll be accessing them from the inner Observers which can be on other threads | |
final ConcurrentLinkedQueue<Obsurvable<T>> pending = new ConcurrentLinkedQueue<Obsurvable<T>>(); | |
final Observer<T> o = new SynchronizedObserver<T>(_o); | |
return new Operator<Obsurvable<T>>() { | |
@Override | |
public OperatorSubscription getSubscription() { | |
return os; | |
}; | |
@Override | |
public void onCompleted() { | |
complete(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(Obsurvable<T> innerObservable) { | |
// track so we send onComplete only when all have finished | |
completionCounter.incrementAndGet(); | |
// check concurrency | |
if (concurrentCounter.incrementAndGet() > maxConcurrent) { | |
pending.add(innerObservable); | |
concurrentCounter.decrementAndGet(); | |
} else { | |
// we are able to proceed | |
OperatorSubscription innerSubscription = new OperatorSubscription(); | |
os.add(innerSubscription); | |
innerObservable.observe(new InnerObserver(), innerSubscription); | |
} | |
} | |
private void complete() { | |
if (completionCounter.decrementAndGet() == 0) { | |
o.onCompleted(); | |
return; | |
} else { | |
// not all are completed and some may still need to run | |
concurrentCounter.decrementAndGet(); | |
} | |
// do work-stealing on whatever thread we're on and subscribe to pending observables | |
if (concurrentCounter.incrementAndGet() > maxConcurrent) { | |
// still not space to run | |
concurrentCounter.decrementAndGet(); | |
} else { | |
// we can run | |
Obsurvable<T> outstandingObservable = pending.poll(); | |
if (outstandingObservable != null) { | |
OperatorSubscription innerSubscription = new OperatorSubscription(); | |
os.add(innerSubscription); | |
outstandingObservable.observe(new InnerObserver(), innerSubscription); | |
} | |
} | |
} | |
final class InnerObserver implements Observer<T> { | |
@Override | |
public void onCompleted() { | |
complete(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(T a) { | |
o.onNext(a); | |
} | |
} | |
}; | |
} | |
} | |
private static class MapOperator<T, R> implements Func2<Observer<R>, OperatorSubscription, Operator<T>> { | |
final Func1<T, R> transformer; | |
MapOperator(Func1<T, R> transformer) { | |
this.transformer = transformer; | |
} | |
@Override | |
public Operator<T> call(final Observer<R> o, final OperatorSubscription s) { | |
// System.out.println("$$ map subscription: " + s + " unsubscribed: " + s.isUnsubscribed()); | |
return new Operator<T>() { | |
public OperatorSubscription getSubscription() { | |
return s; | |
} | |
@Override | |
public void onCompleted() { | |
// System.out.println("onCompleted operator: " + this); | |
o.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
// System.out.println("MAP onNext: " + t + " becomes " + transformer.call(t) + " unsubscribed: " + s.isUnsubscribed()); | |
o.onNext(transformer.call(t)); | |
} | |
}; | |
} | |
} | |
private static class TakeOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> { | |
final int limit; | |
TakeOperator(int limit) { | |
this.limit = limit; | |
} | |
@Override | |
public Operator<T> call(final Observer<T> o, final OperatorSubscription s) { | |
if (limit == 0) { | |
o.onCompleted(); | |
s.unsubscribe(); | |
} | |
final OperatorSubscription is = OperatorSubscription.create(); | |
return new Operator<T>() { | |
int count = 0; | |
public OperatorSubscription getSubscription() { | |
OperatorSubscription toReturn = is; | |
return toReturn; | |
} | |
@Override | |
public void onCompleted() { | |
o.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(T i) { | |
if (!is.isUnsubscribed()) { | |
o.onNext(i); | |
if (++count >= limit) { | |
o.onCompleted(); | |
// System.out.println("-------------------------------------- TAKE unsubscribe: " + s); | |
is.unsubscribe(); | |
} | |
} else { | |
// System.out.println("-------------------------------------- TAKE REJECTED: " + s); | |
} | |
} | |
}; | |
} | |
} | |
private static class RepeatOperator<T> implements Func2<Observer<T>, OperatorSubscription, Operator<Obsurvable<T>>> { | |
RepeatOperator() { | |
} | |
@Override | |
public Operator<Obsurvable<T>> call(final Observer<T> o, final OperatorSubscription s) { | |
return new Operator<Obsurvable<T>>() { | |
public OperatorSubscription getSubscription() { | |
return s; | |
} | |
@Override | |
public void onCompleted() { | |
o.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(Obsurvable<T> ot) { | |
while (!s.isUnsubscribed()) { | |
ot.f.call(o, s); | |
} | |
} | |
}; | |
} | |
} | |
private static class ZipOperator<R> implements Func2<Observer<R>, OperatorSubscription, Operator<Obsurvable[]>> { | |
final FuncN<R> zipFunction; | |
public ZipOperator(FuncN<R> f) { | |
this.zipFunction = f; | |
} | |
@Override | |
public Operator<Obsurvable[]> call(final Observer<R> observer, final OperatorSubscription outerSubscription) { | |
return new Operator<Obsurvable[]>() { | |
public OperatorSubscription getSubscription() { | |
return outerSubscription; | |
} | |
@Override | |
public void onCompleted() { | |
// we only complete once a child Obsurvable completes or errors | |
} | |
@Override | |
public void onError(Throwable e) { | |
observer.onError(e); | |
} | |
@SuppressWarnings({ "unchecked", "rawtypes" }) | |
@Override | |
public void onNext(Obsurvable[] innerObsurvable) { | |
new Zip<R>(innerObsurvable, observer, outerSubscription, zipFunction).zip(); | |
} | |
}; | |
} | |
private static class Zip<R> { | |
@SuppressWarnings("rawtypes") | |
final Obsurvable[] os; | |
final Object[] observers; | |
final Observer<R> observer; | |
final OperatorSubscription outerSubscription; | |
final FuncN<R> zipFunction; | |
final OperatorSubscription childSubscription = new OperatorSubscription(); | |
@SuppressWarnings("rawtypes") | |
public Zip(Obsurvable[] os, final Observer<R> observer, final OperatorSubscription outerSubscription, FuncN<R> zipFunction) { | |
this.os = os; | |
this.observer = observer; | |
this.outerSubscription = outerSubscription; | |
this.zipFunction = zipFunction; | |
observers = new Object[os.length]; | |
for (int i = 0; i < os.length; i++) { | |
observers[i] = new InnerObserver(); | |
} | |
outerSubscription.add(childSubscription); | |
} | |
@SuppressWarnings("unchecked") | |
public void zip() { | |
for (int i = 0; i < os.length; i++) { | |
observers[i] = new InnerObserver(); | |
os[i].observe((InnerObserver) observers[i], childSubscription); | |
} | |
} | |
final AtomicLong counter = new AtomicLong(0); | |
/** | |
* check if we have values for each and emit if we do | |
* | |
* This will only allow one thread at a time to do the work, but ensures via `counter` increment/decrement | |
* that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn. | |
* | |
*/ | |
void tick() { | |
if (counter.getAndIncrement() == 0) { | |
outerloop: do { | |
Object[] vs = new Object[observers.length]; | |
for (int i = 0; i < observers.length; i++) { | |
vs[i] = ((InnerObserver) observers[i]).items.peek(); | |
if (vs[i] instanceof Notification) { | |
observer.onCompleted(); | |
// we need to unsubscribe from all children since children are independently subscribed | |
childSubscription.unsubscribe(); | |
continue outerloop; | |
} | |
if (vs[i] == null) { | |
continue outerloop; | |
} | |
} | |
// all have something so emit | |
observer.onNext(zipFunction.call(vs)); | |
// now remove them | |
for (int i = 0; i < observers.length; i++) { | |
((InnerObserver) observers[i]).items.poll(); | |
} | |
} while (counter.decrementAndGet() > 0); | |
} | |
} | |
// used to observe each obsurvable we are zipping together | |
// it collects all items in an internal queue | |
final class InnerObserver implements Observer { | |
// Concurrent* since we need to read it from across threads | |
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue(); | |
@Override | |
public void onCompleted() { | |
items.add(new Notification()); | |
tick(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
// emit error and shut down | |
observer.onError(e); | |
} | |
@Override | |
public void onNext(Object t) { | |
items.add(t); | |
tick(); | |
} | |
}; | |
} | |
} | |
/**************************************************************************************************************/ | |
public static class UnitTest { | |
@Test | |
public void testBindUnsubscribe() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func2<Observer<Integer>, OperatorSubscription, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Observer<Integer> childObserver, OperatorSubscription childSubscription) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// we return an Operator to the "parent" | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer t) { | |
childObserver.onNext(t); | |
parentSub.unsubscribe(); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
}).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
@Test | |
public void testBindUnsubscribeLayered() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
private static class DebugTakeFunction<T> implements Func2<Observer<T>, OperatorSubscription, Operator<T>> { | |
final int num; | |
final String id; | |
DebugTakeFunction(final int num, final String id) { | |
this.num = num; | |
this.id = id; | |
} | |
int count = 0; | |
@Override | |
public Operator<T> call(final Observer<T> childObserver, final OperatorSubscription childSubscription) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them | |
childSubscription.add(parentSub); | |
// we return an Operator to the "parent" | |
return new Operator<T>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
count++; | |
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childSubscription.isUnsubscribed()); | |
if (!parentSub.isUnsubscribed()) { | |
childObserver.onNext(t); | |
if (count >= num) { | |
System.out.println("simpleTake[" + id + "] => unsubscribe"); | |
parentSub.unsubscribe(); | |
} | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
} | |
@Test | |
public void testBindUnsubscribeNested() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicInteger childCounter = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(4, "A")) | |
.bind(new DebugTakeFunction<Integer>(3, "B")) | |
.bind(new Func2<Observer<Obsurvable<String>>, OperatorSubscription, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Observer<Obsurvable<String>> childObserver, final OperatorSubscription childSubscription) { | |
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue | |
final OperatorSubscription outerSubscription = new OperatorSubscription(); | |
childSubscription.add(outerSubscription); | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we complete and will send nothing further | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer i) { | |
if (outerSubscription.isUnsubscribed()) { | |
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group"); | |
} else { | |
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i))); | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return outerSubscription; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<Obsurvable<String>>(2, "C")) | |
// flatten | |
.bind(new Func2<Observer<String>, OperatorSubscription, Operator<Obsurvable<String>>>() { | |
@Override | |
public Operator<Obsurvable<String>> call(final Observer<String> childObserver, final OperatorSubscription childSubscription) { | |
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription(); | |
final CompositeSubscription childGroups = new CompositeSubscription(); | |
// if child unsubscribes as we are giving it flattened results then we shut everything down | |
childSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** flattening got unsubscribed so shut down all groups"); | |
childGroups.unsubscribe(); | |
} | |
})); | |
return new Operator<Obsurvable<String>>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we don't immediately as sub-groups can still be emitting | |
} | |
@Override | |
public void onError(Throwable e) { | |
// if an error occurs we immediately shut everything down | |
childObserver.onError(e); | |
// unsubscribe all child groups | |
childGroups.unsubscribe(); | |
} | |
@Override | |
public void onNext(Obsurvable<String> o) { | |
// subscribe to this so we can flatten it | |
final OperatorSubscription childGroupSubscription = new OperatorSubscription(); | |
childGroups.add(childGroupSubscription); | |
// if all child groups are unsubscribed we want to unsubscribe the parent | |
childGroupSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** single group got unsubscribed"); | |
} | |
})); | |
o.observe(new Observer<String>() { | |
@Override | |
public void onCompleted() { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
// TODO we are ignoring when all child groups complete and parent is complete | |
} | |
@Override | |
public void onError(Throwable e) { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
} | |
@Override | |
public void onNext(String args) { | |
// emit flattened results | |
childObserver.onNext(args); | |
} | |
}, childGroupSubscription); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentGivingUsGroups; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<String>(7, "D")) | |
.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(2, counter.get()); | |
assertEquals(7, received.get()); | |
assertEquals(7, childCounter.get()); | |
} | |
@Test | |
public void testZip() { | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS | |
.zip(OBSERVABLE_OF_5_INTEGERS, new Func2<Integer, Integer, String>() { | |
@Override | |
public String call(Integer a, Integer b) { | |
return a + "-" + b; | |
} | |
}); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String s) { | |
System.out.println(s); | |
list.add(s); | |
} | |
}); | |
assertEquals(5, list.size()); | |
assertEquals("1-1", list.get(0)); | |
assertEquals("2-2", list.get(1)); | |
assertEquals("5-5", list.get(4)); | |
} | |
@Test | |
public void testZipAsync() throws InterruptedException { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final CountDownLatch infiniteObservables = new CountDownLatch(2); | |
Obsurvable<String> os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables) | |
.zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables), new Func2<Integer, Integer, String>() { | |
@Override | |
public String call(Integer a, Integer b) { | |
return a + "-" + b; | |
} | |
}).take(5); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Observer<String>() { | |
@Override | |
public void onCompleted() { | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
e.printStackTrace(); | |
latch.countDown(); | |
} | |
@Override | |
public void onNext(String s) { | |
System.out.println(s); | |
list.add(s); | |
} | |
}); | |
latch.await(2000, TimeUnit.MILLISECONDS); | |
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) { | |
throw new RuntimeException("didn't unsubscribe"); | |
} | |
assertEquals(5, list.size()); | |
assertEquals("1-1", list.get(0)); | |
assertEquals("2-2", list.get(1)); | |
assertEquals("5-5", list.get(4)); | |
} | |
@Test | |
public void testZipInfiniteAndFinite() throws InterruptedException { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final CountDownLatch infiniteObservable = new CountDownLatch(1); | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS | |
.zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservable), new Func2<Integer, Integer, String>() { | |
@Override | |
public String call(Integer a, Integer b) { | |
return a + "-" + b; | |
} | |
}); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Observer<String>() { | |
@Override | |
public void onCompleted() { | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
e.printStackTrace(); | |
latch.countDown(); | |
} | |
@Override | |
public void onNext(String s) { | |
System.out.println(s); | |
list.add(s); | |
} | |
}); | |
latch.await(1000, TimeUnit.MILLISECONDS); | |
if (!infiniteObservable.await(2000, TimeUnit.MILLISECONDS)) { | |
throw new RuntimeException("didn't unsubscribe"); | |
} | |
assertEquals(5, list.size()); | |
assertEquals("1-1", list.get(0)); | |
assertEquals("2-2", list.get(1)); | |
assertEquals("5-5", list.get(4)); | |
} | |
@Test | |
public void testFlatMap() { | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.flatMap(new Func1<Integer, Obsurvable<String>>() { | |
@Override | |
public Obsurvable<String> call(Integer i) { | |
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i)); | |
} | |
}); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String s) { | |
System.out.println(s); | |
list.add(s); | |
} | |
}); | |
assertEquals(10, list.size()); | |
assertEquals("a-1", list.get(0)); | |
assertEquals("b-1", list.get(1)); | |
assertEquals("a-2", list.get(2)); | |
assertEquals("b-2", list.get(3)); | |
assertEquals("a-5", list.get(8)); | |
assertEquals("b-5", list.get(9)); | |
} | |
@Test | |
public void testFlatMapWithTake() { | |
final AtomicInteger counter = new AtomicInteger(); | |
Obsurvable<Integer> os = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Integer> o, OperatorSubscription s) { | |
for (int i = 1; i <= 5; i++) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
counter.incrementAndGet(); | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
Obsurvable<String> ss = os.flatMap(new Func1<Integer, Obsurvable<String>>() { | |
@Override | |
public Obsurvable<String> call(Integer i) { | |
return Obsurvable.from(Arrays.asList("a-" + i, "b-" + i)); | |
} | |
}).take(4); | |
final ArrayList<String> list = new ArrayList<String>(); | |
ss.subscribe(new Action1<String>() { | |
@Override | |
public void call(String s) { | |
System.out.println(s); | |
list.add(s); | |
} | |
}); | |
assertEquals(4, list.size()); | |
assertEquals("a-1", list.get(0)); | |
assertEquals("b-1", list.get(1)); | |
assertEquals("a-2", list.get(2)); | |
assertEquals("b-2", list.get(3)); | |
assertEquals(2, counter.get()); | |
} | |
@Test | |
public void testMerge() { | |
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) { | |
o.onNext(OBSERVABLE_OF_5_INTEGERS); | |
o.onNext(OBSERVABLE_OF_5_INTEGERS); | |
} | |
}); | |
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
is.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(10, list.size()); | |
assertEquals(1, list.get(0).intValue()); | |
assertEquals(5, list.get(4).intValue()); | |
assertEquals(1, list.get(5).intValue()); | |
assertEquals(5, list.get(9).intValue()); | |
} | |
@Test | |
public void testMergeTwoInfiniteSynchronousSequences() { | |
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) { | |
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS); | |
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS); | |
} | |
}); | |
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
is.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(10, list.size()); | |
// this will actually only get values from the first one as it is infinite and synchronous | |
assertEquals(1, list.get(0).intValue()); | |
assertEquals(5, list.get(4).intValue()); | |
assertEquals(10, list.get(9).intValue()); | |
} | |
@Test | |
public void testMergeFiniteAndInfiniteSynchronousSequences() { | |
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) { | |
o.onNext(OBSERVABLE_OF_5_INTEGERS); | |
o.onNext(OBSERVABLE_OF_INFINITE_INTEGERS); | |
} | |
}); | |
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(10); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
is.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(10, list.size()); | |
// this will actually only get values from the first one as it is infinite and synchronous | |
assertEquals(1, list.get(0).intValue()); | |
assertEquals(5, list.get(4).intValue()); | |
assertEquals(5, list.get(9).intValue()); | |
} | |
@Test | |
public void testMergeInifiteAsync() throws InterruptedException { | |
final CountDownLatch infiniteObservables = new CountDownLatch(2); | |
Obsurvable<Obsurvable<Integer>> os = Obsurvable.create(new Action2<Observer<Obsurvable<Integer>>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Obsurvable<Integer>> o, OperatorSubscription s) { | |
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables)); | |
o.onNext(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables)); | |
} | |
}); | |
Obsurvable<Integer> is = os.bind(new MergeOperator<Integer>()).take(100); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
is.subscribe(new Observer<Integer>() { | |
@Override | |
public void onCompleted() { | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
latch.await(); | |
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) { | |
throw new RuntimeException("didn't unsubscribe"); | |
} | |
assertEquals(100, list.size()); | |
} | |
@Test | |
public void testTake1from5() { | |
Obsurvable<Integer> os = OBSERVABLE_OF_5_INTEGERS.take(1); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
os.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(1, list.size()); | |
assertEquals(1, list.get(0).intValue()); | |
} | |
@Test | |
public void testBindTakeAndMap() { | |
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(TAKE_5).bind(MAP_INTEGER_TO_STRING); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(5, list.size()); | |
assertEquals("Number=>1", list.get(0)); | |
assertEquals("Number=>2", list.get(1)); | |
assertEquals("Number=>3", list.get(2)); | |
assertEquals("Number=>4", list.get(3)); | |
assertEquals("Number=>5", list.get(4)); | |
} | |
@Test | |
public void testBindTakeAndMapMultipleSubscribes() { | |
Obsurvable<String> os = OBSERVABLE_OF_INFINITE_INTEGERS.bind(TAKE_5).bind(MAP_INTEGER_TO_STRING); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(5, list.size()); | |
assertEquals("Number=>1", list.get(0)); | |
assertEquals("Number=>2", list.get(1)); | |
assertEquals("Number=>3", list.get(2)); | |
assertEquals("Number=>4", list.get(3)); | |
assertEquals("Number=>5", list.get(4)); | |
// assert we can subscribe to it again and the subscription is correct | |
final ArrayList<String> list2 = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
System.out.println(i); | |
list2.add(i); | |
} | |
}); | |
assertEquals(5, list2.size()); | |
assertEquals("Number=>1", list2.get(0)); | |
assertEquals("Number=>2", list2.get(1)); | |
assertEquals("Number=>3", list2.get(2)); | |
assertEquals("Number=>4", list2.get(3)); | |
assertEquals("Number=>5", list2.get(4)); | |
} | |
@Test | |
public void testBindMap() { | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(5, list.size()); | |
assertEquals("Number=>1", list.get(0)); | |
} | |
@Test | |
public void testMapAndTake() { | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(MAP_INTEGER_TO_STRING).take(2); | |
final ArrayList<String> list = new ArrayList<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(2, list.size()); | |
assertEquals("Number=>1", list.get(0)); | |
} | |
@Test | |
public void testUnSubscribeOfEachOperator() { | |
final AtomicBoolean unsubscribed = new AtomicBoolean(false); | |
Obsurvable<String> os = OBSERVABLE_OF_5_INTEGERS.bind(new Func2<Observer<String>, OperatorSubscription, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Observer<String> os, final OperatorSubscription s) { | |
s.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
unsubscribed.set(true); | |
} | |
})); | |
return new Operator<Integer>() { | |
public OperatorSubscription getSubscription() { | |
return s; | |
} | |
@Override | |
public void onCompleted() { | |
os.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
os.onError(e); | |
} | |
@Override | |
public void onNext(Integer i) { | |
os.onNext(String.valueOf(i)); | |
} | |
}; | |
} | |
}).take(3).map(new Func1<String, String>() { | |
@Override | |
public String call(String t) { | |
return "mapped-" + t; | |
} | |
}).take(2); | |
final AtomicReference<String> valueA = new AtomicReference<String>(); | |
os.subscribe(new Action1<String>() { | |
@Override | |
public void call(String s) { | |
System.out.println(" received: " + s); | |
valueA.set(s); | |
} | |
}); | |
assertEquals("mapped-2", valueA.get()); | |
assertTrue(unsubscribed.get()); | |
// reset to subscribe again | |
unsubscribed.set(false); | |
final AtomicReference<String> valueB = new AtomicReference<String>(); | |
os.map(new Func1<String, String>() { | |
@Override | |
public String call(String t) { | |
return t + "-B"; | |
} | |
}).take(1).subscribe(new Action1<String>() { | |
@Override | |
public void call(String s) { | |
System.out.println(" received: " + s); | |
valueB.set(s); | |
} | |
}); | |
assertEquals("mapped-1-B", valueB.get()); | |
assertTrue(unsubscribed.get()); | |
} | |
@Test | |
public void testBindTakeFromInfinite() { | |
int desiredCount = 5; | |
Obsurvable<Integer> oi = OBSERVABLE_OF_INFINITE_INTEGERS.bind(new TakeOperator<Integer>(desiredCount)); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
oi.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(desiredCount, list.size()); | |
} | |
@Test | |
public void testCreateAndSubscribe() { | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
OBSERVABLE_OF_5_INTEGERS.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
assertEquals(5, list.size()); | |
} | |
@Test | |
public void testTakeFromAsyncInfinite() throws InterruptedException { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final CountDownLatch infiniteObservables = new CountDownLatch(1); | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables).take(10).subscribe(new Observer<Integer>() { | |
@Override | |
public void onCompleted() { | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(Integer i) { | |
System.out.println(i); | |
list.add(i); | |
} | |
}); | |
latch.await(); | |
if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) { | |
throw new RuntimeException("didn't unsubscribe"); | |
} | |
assertEquals(10, list.size()); | |
} | |
@Test | |
public void testMultipleSubscriptionChains() { | |
Obsurvable<Integer> x = OBSERVABLE_OF_INFINITE_INTEGERS; | |
Obsurvable<Integer> y = x.take(2); | |
Obsurvable<Integer> z = x.take(1); | |
final AtomicInteger value = new AtomicInteger(); | |
y.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
value.set(i); | |
} | |
}); | |
assertEquals(2, value.get()); | |
value.set(-1); // reset | |
z.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
value.set(i); | |
} | |
}); | |
assertEquals(1, value.get()); | |
} | |
@Test | |
public void testBindRepeat() { | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
Obsurvable.from(OBSERVABLE_OF_5_INTEGERS) | |
.bind(new RepeatOperator<Integer>()) | |
.take(25) | |
.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
list.add(i); | |
} | |
}); | |
assertEquals(25, list.size()); | |
} | |
@Test | |
public void testRepeat() { | |
final ArrayList<Integer> list = new ArrayList<Integer>(); | |
OBSERVABLE_OF_5_INTEGERS | |
.repeat() | |
.take(25) | |
.subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
list.add(i); | |
} | |
}); | |
assertEquals(25, list.size()); | |
} | |
final Func1<String, Integer> length = new Func1<String, Integer>() { | |
@Override | |
public Integer call(String s) { | |
return s.length(); | |
} | |
}; | |
@Test | |
public void testGroupBy() { | |
Obsurvable<String> source = Obsurvable.from(Arrays.asList("one", "two", "three", "four", "five", "six")); | |
Obsurvable<Obsurvable<String>> grouped = source.bind(new OperatorGroupBy<Integer, String>(length)); | |
final String[][] content = new String[3][3]; | |
grouped.subscribe(new Action1<Obsurvable<String>>() { | |
int c = 0; | |
@Override | |
public void call(Obsurvable<String> o) { | |
o.subscribe(new Action1<String>() { | |
int i = 0; | |
final int key = c++; | |
@Override | |
public void call(String v) { | |
System.out.println("key: " + key + " v: " + v); | |
content[key][i++] = v; | |
} | |
}); | |
} | |
}); | |
System.out.println(Arrays.asList(content[0])); | |
System.out.println(Arrays.asList(content[1])); | |
System.out.println(Arrays.asList(content[2])); | |
assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), content[0]); | |
assertArrayEquals(Arrays.asList("three", null, null).toArray(), content[1]); | |
assertArrayEquals(Arrays.asList("four", "five", null).toArray(), content[2]); | |
} | |
private static class Event { | |
int source; | |
String message; | |
@Override | |
public String toString() { | |
return "Event => source: " + source + " message: " + message; | |
} | |
} | |
@SuppressWarnings({ "rawtypes", "unchecked" }) | |
@Test | |
public void testGroupByUnsubscribe() throws InterruptedException { | |
final AtomicInteger eventCounter = new AtomicInteger(); | |
final AtomicInteger subscribeCounter = new AtomicInteger(); | |
final AtomicInteger groupCounter = new AtomicInteger(); | |
final AtomicInteger sentEventCounter = new AtomicInteger(); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final int numToSend = 100; | |
final int numGroupsToSend = 2; | |
ASYNC_INFINITE_OBSERVABLE_OF_EVENT(numGroupsToSend, numToSend, subscribeCounter, sentEventCounter) | |
// groupBy the source (mod of groupCount) | |
.groupBy(new Func1<Event, Integer>() { | |
@Override | |
public Integer call(Event e) { | |
return e.source; | |
} | |
}) | |
.take(1) // we want only the first group | |
.flatMap(new Func1<Obsurvable<Event>, Obsurvable<String>>() { | |
@Override | |
public Obsurvable<String> call(Obsurvable<Event> eventGroupedObservable) { | |
// how many groups we see (should be 1 because of take(1)) | |
groupCounter.incrementAndGet(); | |
return eventGroupedObservable | |
.take(20) // limit to only 20 events on this group | |
.map(new Func1<Event, String>() { | |
@Override | |
public String call(Event event) { | |
return "testUnsubscribe => Source: " + event.source + " Message: " + event.message; | |
} | |
}); | |
} | |
}) | |
.subscribe(new Observer() { | |
@Override | |
public void onCompleted() { | |
latch.countDown(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
e.printStackTrace(); | |
latch.countDown(); | |
} | |
@Override | |
public void onNext(Object outputMessage) { | |
eventCounter.incrementAndGet(); | |
} | |
}); | |
if(!latch.await(5000, TimeUnit.MILLISECONDS)) { | |
fail("timed out so infinite loop likely never stopped"); | |
} | |
System.out.println("@@@@@@@ Sent: " + sentEventCounter.get() + " received: " + eventCounter.get()); | |
assertEquals(1, subscribeCounter.get()); | |
assertEquals(1, groupCounter.get()); | |
assertEquals(20, eventCounter.get()); | |
// sentEvents will go until 'eventCounter' hits 20 and then unsubscribes | |
// which means it will also send (but ignore) at least the 19/20 events for the other group | |
// but likely many more as the unsubscribe races the other thread. | |
// Since it is an infinite loop, if it stops then the unsubscribe worked. | |
} | |
TakeOperator<Integer> TAKE_5 = new TakeOperator<Integer>(5); | |
MapOperator<Integer, String> MAP_INTEGER_TO_STRING = new MapOperator<Integer, String>(new Func1<Integer, String>() { | |
@Override | |
public String call(Integer i) { | |
return "Number=>" + i; | |
} | |
}); | |
Obsurvable<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Integer> o, OperatorSubscription s) { | |
int i = 1; | |
// System.out.println("source subscription: " + s); | |
while (!s.isUnsubscribed()) { | |
o.onNext(i++); | |
} | |
o.onCompleted(); | |
} | |
}); | |
Obsurvable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { | |
return Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<Integer> o, OperatorSubscription s) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
Obsurvable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); | |
Obsurvable<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) { | |
return Obsurvable.create(new Action2<Observer<String>, OperatorSubscription>() { | |
@Override | |
public void call(Observer<String> o, OperatorSubscription s) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (s.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(v + "-" + i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
Obsurvable<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) { | |
return Obsurvable.create(new Action2<Observer<Integer>, OperatorSubscription>() { | |
@Override | |
public void call(final Observer<Integer> o, final OperatorSubscription s) { | |
Thread t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("-------> subscribe to infinite sequence"); | |
System.out.println("Starting thread: " + Thread.currentThread()); | |
int i = 1; | |
while (!s.isUnsubscribed()) { | |
o.onNext(i++); | |
Thread.yield(); | |
} | |
o.onCompleted(); | |
latch.countDown(); | |
System.out.println("Ending thread: " + Thread.currentThread()); | |
} | |
}); | |
t.start(); | |
} | |
}); | |
} | |
Obsurvable<Event> ASYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final int numEvents, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) { | |
return Obsurvable.create(new Action2<Observer<Event>, OperatorSubscription>() { | |
@Override | |
public void call(final Observer<Event> observer, final OperatorSubscription s) { | |
System.out.println("*********** source received subscription: " + s); | |
// System.out.println("testUnsubscribe => *** Subscribing to EventStream ***"); | |
subscribeCounter.incrementAndGet(); | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
int i=0; | |
while(!s.isUnsubscribed()) { | |
i++; | |
Event e = new Event(); | |
e.source = i % numGroups; | |
e.message = "Event-" + i; | |
observer.onNext(e); | |
sentEventCounter.incrementAndGet(); | |
} | |
observer.onCompleted(); | |
} | |
}).start(); | |
} | |
}); | |
}; | |
Obsurvable<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)); | |
} | |
/**********************************************************************/ | |
public Subscription subscribe(final Action1<T> onNext) { | |
return subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(T t) { | |
onNext.call(t); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment