-
-
Save markus2610/7eb4389854a618fd019c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package rxjava.issue; | |
| import java.util.Queue; | |
| import java.util.concurrent.*; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import rx.*; | |
| import rx.Observable.Operator; | |
| import rx.Scheduler.Worker; | |
| import rx.functions.Action0; | |
| import rx.internal.operators.NotificationLite; | |
| import rx.observers.SerializedSubscriber; | |
| import rx.schedulers.Schedulers; | |
| import rx.subscriptions.CompositeSubscription; | |
| public class NoBackpressureOps { | |
| public static final class ClassicMerge<T> implements Operator<T, Observable<T>> { | |
| @Override | |
| public Subscriber<? super Observable<T>> call(Subscriber<? super T> t1) { | |
| final SerializedSubscriber<T> s = new SerializedSubscriber<T>(t1, false); | |
| Subscriber<Observable<T>> r = new Subscriber<Observable<T>>() { | |
| final CompositeSubscription csub = new CompositeSubscription(); | |
| final AtomicInteger wip = new AtomicInteger(1); | |
| @Override | |
| public void onNext(Observable<T> t) { | |
| Subscriber<T> inner = new Subscriber<T>() { | |
| @Override | |
| public void onNext(T t) { | |
| s.onNext(t); | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| innerError(e); | |
| csub.remove(this); | |
| } | |
| @Override | |
| public void onCompleted() { | |
| innerCompleted(); | |
| csub.remove(this); | |
| } | |
| }; | |
| csub.add(inner); | |
| wip.incrementAndGet(); | |
| t.unsafeSubscribe(inner); | |
| } | |
| void innerError(Throwable e) { | |
| onError(e); | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| s.onError(e); | |
| csub.unsubscribe(); | |
| } | |
| void innerCompleted() { | |
| onCompleted(); | |
| } | |
| @Override | |
| public void onCompleted() { | |
| if (wip.decrementAndGet() == 0) { | |
| s.onCompleted(); | |
| } | |
| } | |
| }; | |
| t1.add(s); | |
| s.add(r); | |
| return r; | |
| } | |
| } | |
| public static final class ClassicObserveOn<T> implements Operator<T, T> { | |
| private final class SchedulingSubscriber extends Subscriber<T> implements Action0 { | |
| private final AtomicInteger wip; | |
| private final Queue<Object> queue; | |
| private final NotificationLite<T> nl; | |
| private final Worker w; | |
| private final Subscriber<? super T> child; | |
| private SchedulingSubscriber(Worker w, Subscriber<? super T> child) { | |
| queue = new ConcurrentLinkedQueue<Object>(); | |
| nl = NotificationLite.instance(); | |
| wip = new AtomicInteger(); | |
| this.w = w; | |
| this.child = child; | |
| } | |
| @Override | |
| public void onNext(T t) { | |
| queue.offer(nl.next(t)); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| queue.offer(nl.error(e)); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| @Override | |
| public void onCompleted() { | |
| queue.offer(nl.completed()); | |
| if (wip.getAndIncrement() == 0) { | |
| schedule(); | |
| } | |
| } | |
| void schedule() { | |
| w.schedule(this); | |
| } | |
| @Override | |
| public void call() { | |
| do { | |
| Object o = queue.poll(); | |
| if (o != null) { | |
| nl.accept(child, o); | |
| } | |
| } while (wip.decrementAndGet() > 0); | |
| } | |
| } | |
| final Scheduler scheduler; | |
| public ClassicObserveOn(Scheduler scheduler) { | |
| this.scheduler = scheduler; | |
| } | |
| @Override | |
| public Subscriber<? super T> call(Subscriber<? super T> t1) { | |
| final Worker w = scheduler.createWorker(); | |
| t1.add(w); | |
| Subscriber<T> r = new SchedulingSubscriber(w, t1); | |
| t1.add(r); | |
| return r; | |
| } | |
| } | |
| public static void main(String[] args) throws InterruptedException { | |
| Observable<Integer> source = Observable.just( | |
| Observable.just(1).lift(new ClassicObserveOn(Schedulers.computation())), | |
| Observable.just(2).lift(new ClassicObserveOn(Schedulers.computation()))) | |
| .lift(new ClassicMerge<Integer>()); | |
| CountDownLatch cdl = new CountDownLatch(1); | |
| source.subscribe(System.out::println, | |
| e -> { e.printStackTrace(); cdl.countDown(); }, | |
| () -> { System.out.println("Done"); cdl.countDown(); }); | |
| cdl.await(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment