Skip to content

Instantly share code, notes, and snippets.

@abersnaze
Created March 11, 2017 14:31
Show Gist options
  • Save abersnaze/745f3a433436dbaf7ea2ec684160aabf to your computer and use it in GitHub Desktop.
Save abersnaze/745f3a433436dbaf7ea2ec684160aabf to your computer and use it in GitHub Desktop.
RxJava expand
package foo;
import static rx.Observable.concat;
import static rx.Observable.empty;
import static rx.Observable.range;
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.observers.SerializedObserver;
import rx.subjects.BehaviorSubject;
public class Main {
public static void main(String[] args) {
concat(expand(n(4), (i) -> n(i - 1)).map(Observable::toList)).subscribe(System.out::println);
}
public static Observable<Integer> n(int n) {
if (n == 0)
return empty();
return range(1, n);
}
public static <T> Observable<Observable<T>> expand(Observable<T> src, Func1<T, Observable<T>> func) {
return Observable.defer(() -> {
BehaviorSubject<Observable<T>> queue = BehaviorSubject.create();
AtomicLong outstanding = new AtomicLong();
SerializedObserver<Observable<T>> serializedQueue = new SerializedObserver<>(queue);
queue.onNext(expandHelper(src, func, serializedQueue, outstanding));
return queue.asObservable();
});
}
public static <T> Observable<T> expandHelper(Observable<T> src, Func1<T, Observable<T>> func, Observer<Observable<T>> queue, AtomicLong outstanding) {
outstanding.incrementAndGet();
return src.doOnTerminate(() -> {
if (outstanding.decrementAndGet() == 0)
queue.onCompleted();
}).doOnNext(next -> {
Observable<T> raw = func.call(next);
Observable<T> decorated = expandHelper(raw, func, queue, outstanding);
queue.onNext(decorated);
});
}
}
[1, 2, 3, 4]
[]
[1]
[1, 2]
[1, 2, 3]
[]
[]
[1]
[]
[1]
[1, 2]
[]
[]
[]
[1]
[]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment