Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created February 27, 2015 13:46
Show Gist options
  • Save akarnokd/27f0603185bee0e15f01 to your computer and use it in GitHub Desktop.
Save akarnokd/27f0603185bee0e15f01 to your computer and use it in GitHub Desktop.
package rxjava.issue;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import rx.*;
import rx.Observable.Operator;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.internal.operators.NotificationLite;
import rx.observers.SerializedSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
public class NoBackpressureOps {
public static final class ClassicMerge<T> implements Operator<T, Observable<T>> {
@Override
public Subscriber<? super Observable<T>> call(Subscriber<? super T> t1) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(t1, false);
Subscriber<Observable<T>> r = new Subscriber<Observable<T>>() {
final CompositeSubscription csub = new CompositeSubscription();
final AtomicInteger wip = new AtomicInteger(1);
@Override
public void onNext(Observable<T> t) {
Subscriber<T> inner = new Subscriber<T>() {
@Override
public void onNext(T t) {
s.onNext(t);
}
@Override
public void onError(Throwable e) {
innerError(e);
csub.remove(this);
}
@Override
public void onCompleted() {
innerCompleted();
csub.remove(this);
}
};
csub.add(inner);
wip.incrementAndGet();
t.unsafeSubscribe(inner);
}
void innerError(Throwable e) {
onError(e);
}
@Override
public void onError(Throwable e) {
s.onError(e);
csub.unsubscribe();
}
void innerCompleted() {
onCompleted();
}
@Override
public void onCompleted() {
if (wip.decrementAndGet() == 0) {
s.onCompleted();
}
}
};
t1.add(s);
s.add(r);
return r;
}
}
public static final class ClassicObserveOn<T> implements Operator<T, T> {
private final class SchedulingSubscriber extends Subscriber<T> implements Action0 {
private final AtomicInteger wip;
private final Queue<Object> queue;
private final NotificationLite<T> nl;
private final Worker w;
private final Subscriber<? super T> child;
private SchedulingSubscriber(Worker w, Subscriber<? super T> child) {
queue = new ConcurrentLinkedQueue<Object>();
nl = NotificationLite.instance();
wip = new AtomicInteger();
this.w = w;
this.child = child;
}
@Override
public void onNext(T t) {
queue.offer(nl.next(t));
if (wip.getAndIncrement() == 0) {
schedule();
}
}
@Override
public void onError(Throwable e) {
queue.offer(nl.error(e));
if (wip.getAndIncrement() == 0) {
schedule();
}
}
@Override
public void onCompleted() {
queue.offer(nl.completed());
if (wip.getAndIncrement() == 0) {
schedule();
}
}
void schedule() {
w.schedule(this);
}
@Override
public void call() {
do {
Object o = queue.poll();
if (o != null) {
nl.accept(child, o);
}
} while (wip.decrementAndGet() > 0);
}
}
final Scheduler scheduler;
public ClassicObserveOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
final Worker w = scheduler.createWorker();
t1.add(w);
Subscriber<T> r = new SchedulingSubscriber(w, t1);
t1.add(r);
return r;
}
}
public static void main(String[] args) throws InterruptedException {
Observable<Integer> source = Observable.just(
Observable.just(1).lift(new ClassicObserveOn(Schedulers.computation())),
Observable.just(2).lift(new ClassicObserveOn(Schedulers.computation())))
.lift(new ClassicMerge<Integer>());
CountDownLatch cdl = new CountDownLatch(1);
source.subscribe(System.out::println,
e -> { e.printStackTrace(); cdl.countDown(); },
() -> { System.out.println("Done"); cdl.countDown(); });
cdl.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment