Last active
June 28, 2016 00:21
-
-
Save abersnaze/c8c37847bc4bddf13dbc7cc75b99ae95 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rx.schedulers; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.functions.Func0; | |
import rx.functions.Func1; | |
import rx.subjects.PublishSubject; | |
import rx.subscriptions.BooleanSubscription; | |
public class FineGrainCompositeScheduler extends Scheduler { | |
private final Scheduler actualScheduler; | |
private final PublishSubject<Observable<Void>> actionQueue; | |
public FineGrainCompositeScheduler(Func1<Observable<Observable<Void>>, Observable<Void>> combine, int maxConcurrent, | |
Scheduler actual) { | |
this.actualScheduler = actual; | |
this.actionQueue = PublishSubject.<Observable<Void>>create(); | |
combine.call(actionQueue).subscribe(); | |
} | |
@Override | |
public Worker createWorker() { | |
final Worker actualWorker = actualScheduler.createWorker(); | |
return new Worker() { | |
@Override | |
public void unsubscribe() { | |
actualWorker.unsubscribe(); | |
} | |
@Override | |
public boolean isUnsubscribed() { | |
return actualWorker.isUnsubscribed(); | |
} | |
@Override | |
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) { | |
final long nowBeforeQueued = TimeUnit.MILLISECONDS.convert(actualScheduler.now(), unit); | |
final BooleanSubscription subscription = new BooleanSubscription(); | |
Observable<Void> queuedAction = Observable.defer(new Func0<Observable<Void>>() { | |
@Override | |
public Observable<Void> call() { | |
if (!subscription.isUnsubscribed()) { | |
long nowAfterQueued = actualScheduler.now(); | |
long queuingDelay = nowAfterQueued - nowBeforeQueued; | |
actualWorker.schedule(action, delayTime - queuingDelay, unit); | |
} | |
return Observable.empty(); | |
} | |
}); | |
actionQueue.onNext(queuedAction); | |
return subscription; | |
} | |
@Override | |
public Subscription schedule(final Action0 action) { | |
final BooleanSubscription subscription = new BooleanSubscription(); | |
Observable<Void> queuedAction = Observable.defer(new Func0<Observable<Void>>() { | |
@Override | |
public Observable<Void> call() { | |
if (!subscription.isUnsubscribed()) { | |
actualWorker.schedule(action); | |
} | |
return Observable.empty(); | |
} | |
}); | |
actionQueue.onNext(queuedAction); | |
return subscription; | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I concede that I most punted on doing any of the subscription management correctly.