Last active
June 27, 2016 21:47
-
-
Save abersnaze/07d995f63ed6100ef60bd9ac980d59ed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rx.schedulers; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicReference; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.functions.Func0; | |
import rx.functions.Func1; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.ReplaySubject; | |
import rx.subscriptions.BooleanSubscription; | |
import rx.subscriptions.Subscriptions; | |
public class CoarseGrainCompositeScheduler extends Scheduler { | |
private final Scheduler actualScheduler; | |
private final PublishSubject<Observable<Void>> workerQueue; | |
public CoarseGrainCompositeScheduler(Func1<Observable<Observable<Void>>, Observable<Void>> combine, int maxConcurrent, | |
Scheduler actualScheduler) { | |
this.actualScheduler = actualScheduler; | |
this.workerQueue = PublishSubject.<Observable<Void>>create(); | |
combine.call(workerQueue).subscribe(); | |
} | |
private static final Subscription SUBSCRIBED = new Subscription() { | |
@Override | |
public void unsubscribe() { | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return false; | |
} | |
}; | |
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed(); | |
@SuppressWarnings("serial") | |
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription { | |
public final void call(Worker actualWorker) { | |
Subscription oldState = get(); | |
// either SUBSCRIBED or UNSUBSCRIBED | |
if (oldState == UNSUBSCRIBED) { | |
// no need to schedule return | |
return; | |
} | |
if (oldState != SUBSCRIBED) { | |
// has already been scheduled return | |
// should not be able to get here but handle it anyway. | |
return; | |
} | |
Subscription newState = callActual(actualWorker); | |
if (!compareAndSet(SUBSCRIBED, newState)) { | |
// set would only fail if the new current state is either | |
// UNSUBSCRIBED or some other subscription from a concurrent | |
// call to this method. Unsubscribe from the action just | |
// scheduled. | |
newState.unsubscribe(); | |
} | |
} | |
abstract Subscription callActual(Worker actualWorker); | |
@Override | |
public boolean isUnsubscribed() { | |
return get().isUnsubscribed(); | |
} | |
@Override | |
public void unsubscribe() { | |
Subscription oldState; | |
// no matter what the current state is the new state is going to be | |
Subscription newState = UNSUBSCRIBED; | |
do { | |
oldState = get(); | |
if (oldState == UNSUBSCRIBED) { | |
// the action has already been unsubscribed | |
return; | |
} | |
} while (!compareAndSet(oldState, newState)); | |
if (oldState != SUBSCRIBED) { | |
// the action was scheduled. stop it. | |
oldState.unsubscribe(); | |
} | |
} | |
} | |
@SuppressWarnings("serial") | |
private static class ImmediateAction extends ScheduledAction { | |
private final Action0 action; | |
public ImmediateAction(Action0 action) { | |
this.action = action; | |
} | |
@Override | |
public Subscription callActual(Worker actualWorker) { | |
return actualWorker.schedule(action); | |
} | |
} | |
@SuppressWarnings("serial") | |
private static class DelayedAction extends ScheduledAction { | |
private final Action0 action; | |
private final long delayTime; | |
private final TimeUnit unit; | |
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) { | |
this.action = action; | |
this.delayTime = delayTime; | |
this.unit = unit; | |
} | |
@Override | |
public Subscription callActual(Worker actualWorker) { | |
return actualWorker.schedule(action, delayTime, unit); | |
} | |
} | |
@Override | |
public Worker createWorker() { | |
// a queue for the actions submitted while worker is waiting to get to | |
// the front of the workerQueue. | |
final ReplaySubject<ScheduledAction> actionQueue = ReplaySubject.create(); | |
// complete the actionQueue when worker is unsubscribed to make room | |
// for the next worker in the workerQueue. | |
final Subscription subscription = BooleanSubscription.create(new Action0() { | |
@Override | |
public void call() { | |
actionQueue.onCompleted(); | |
} | |
}); | |
// add to the worker queue. | |
workerQueue.onNext(Observable.defer(new Func0<Observable<Void>>() { | |
@Override | |
public Observable<Void> call() { | |
final Worker actualWorker = actualScheduler.createWorker(); | |
// pull from the actionQueue and start scheduling on the actual | |
// worker. | |
return actionQueue.map(new Func1<ScheduledAction, Void>() { | |
@Override | |
public Void call(ScheduledAction action) { | |
action.call(actualWorker); | |
return null; | |
} | |
}); | |
} | |
}).ignoreElements()); | |
return new Worker() { | |
@Override | |
public void unsubscribe() { | |
subscription.unsubscribe(); | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return subscription.isUnsubscribed(); | |
} | |
@Override | |
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) { | |
// send a scheduled action to the actionQueue | |
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); | |
actionQueue.onNext(delayedAction); | |
return delayedAction; | |
} | |
@Override | |
public Subscription schedule(final Action0 action) { | |
// send a scheduled action to the actionQueue | |
ImmediateAction immediateAction = new ImmediateAction(action); | |
actionQueue.onNext(immediateAction); | |
return immediateAction; | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment