Created
February 27, 2015 13:46
-
-
Save akarnokd/27f0603185bee0e15f01 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package rxjava.issue; | |
| import java.util.Queue; | |
| import java.util.concurrent.*; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import rx.*; | |
| import rx.Observable.Operator; | |
| import rx.Scheduler.Worker; | |
| import rx.functions.Action0; | |
| import rx.internal.operators.NotificationLite; | |
| import rx.observers.SerializedSubscriber; | |
| import rx.schedulers.Schedulers; | |
| import rx.subscriptions.CompositeSubscription; | |
| public class NoBackpressureOps { | |
| public static final class ClassicMerge<T> implements Operator<T, Observable<T>> { | |
| @Override | |
| public Subscriber<? super Observable<T>> call(Subscriber<? super T> t1) { | |
| final SerializedSubscriber<T> s = new SerializedSubscriber<T>(t1, false); | |
| Subscriber<Observable<T>> r = new Subscriber<Observable<T>>() { | |
| final CompositeSubscription csub = new CompositeSubscription(); | |
| final AtomicInteger wip = new AtomicInteger(1); | |
| @Override | |
| public void onNext(Observable<T> t) { | |
| Subscriber<T> inner = new Subscriber<T>() { | |
| @Override | |
| public void onNext(T t) { | |
| s.onNext(t); | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| innerError(e); | |
| csub.remove(this); | |
| } | |
| @Override | |
| public void onCompleted() { | |
| innerCompleted(); | |
| csub.remove(this); | |
| } | |
| }; | |
| csub.add(inner); | |
| wip.incrementAndGet(); | |
| t.unsafeSubscribe(inner); | |
| } | |
| void innerError(Throwable e) { | |
| onError(e); | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| s.onError(e); | |
| csub.unsubscribe(); | |
| } | |
| void innerCompleted() { | |
| onCompleted(); | |
| } | |
| @Override | |
| public void onCompleted() { | |
| if (wip.decrementAndGet() == 0) { | |
| s.onCompleted(); | |
| } | |
| } | |
| }; | |
| t1.add(s); | |
| s.add(r); | |
| return r; | |
| } | |
| } | |
| public static final class ClassicObserveOn<T> implements Operator<T, T> { | |
| private final class SchedulingSubscriber extends Subscriber<T> implements Action0 { | |
| private final AtomicInteger wip; | |
| private final Queue<Object> queue; | |
| private final NotificationLite<T> nl; | |
| private final Worker w; | |
| private final Subscriber<? super T> child; | |
| private SchedulingSubscriber(Worker w, Subscriber<? super T> child) { | |
| queue = new ConcurrentLinkedQueue<Object>(); | |
| nl = NotificationLite.instance(); | |
| wip = new AtomicInteger(); | |
| this.w = w; | |
| this.child = child; | |
| } | |
| @Override | |
| public void onNext(T t) { | |
| queue.offer(nl.next(t)); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| queue.offer(nl.error(e)); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| @Override | |
| public void onCompleted() { | |
| queue.offer(nl.completed()); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| void schedule() { | |
| w.schedule(this); | |
| } | |
| @Override | |
| public void call() { | |
| do { | |
| Object o = queue.poll(); | |
| if (o != null) { | |
| nl.accept(child, o); | |
| } | |
| } while (wip.decrementAndGet() > 0); | |
| } | |
| } | |
| final Scheduler scheduler; | |
| public ClassicObserveOn(Scheduler scheduler) { | |
| this.scheduler = scheduler; | |
| } | |
| @Override | |
| public Subscriber<? super T> call(Subscriber<? super T> t1) { | |
| final Worker w = scheduler.createWorker(); | |
| t1.add(w); | |
| Subscriber<T> r = new SchedulingSubscriber(w, t1); | |
| t1.add(r); | |
| return r; | |
| } | |
| } | |
| public static void main(String[] args) throws InterruptedException { | |
| Observable<Integer> source = Observable.just( | |
| Observable.just(1).lift(new ClassicObserveOn(Schedulers.computation())), | |
| Observable.just(2).lift(new ClassicObserveOn(Schedulers.computation()))) | |
| .lift(new ClassicMerge<Integer>()); | |
| CountDownLatch cdl = new CountDownLatch(1); | |
| source.subscribe(System.out::println, | |
| e -> { e.printStackTrace(); cdl.countDown(); }, | |
| () -> { System.out.println("Done"); cdl.countDown(); }); | |
| cdl.await(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment