Last active
August 29, 2015 13:55
-
-
Save benjchristensen/8744171 to your computer and use it in GitHub Desktop.
Scheduler.java concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
compositeSubscription.add(scheduler.schedule(new Action1<Action0>() { | |
@Override | |
public void call(final Action0 self) { | |
innerSubscription.set(source.subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
self.call(); | |
} | |
@Override | |
public void onError(Throwable error) { | |
observer.onError(error); | |
} | |
@Override | |
public void onNext(T value) { | |
observer.onNext(value); | |
} | |
})); | |
} | |
})); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this forces the user to start at the outer and go to the inner | |
// but forces an ugly nested pattern | |
compositeSubscription.add(scheduler.schedule(new Action1<Inner>() { | |
@Override | |
public void call(Inner inner) { | |
inner.schedule(new Action1<Action0>() { | |
@Override | |
public void call(final Action0 self) { | |
innerSubscription.set(source.subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
self.call(); | |
} | |
@Override | |
public void onError(Throwable error) { | |
observer.onError(error); | |
} | |
@Override | |
public void onNext(T value) { | |
observer.onNext(value); | |
} | |
})); | |
} | |
}); | |
} | |
})); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2013 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx; | |
import java.util.Date; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.MultipleAssignmentSubscription; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Func0; | |
import rx.util.functions.Func1; | |
import rx.util.functions.Func2; | |
/** | |
* Represents an object that schedules units of work. | |
* <p> | |
* The methods left to implement are: | |
* <ul> | |
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li> | |
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li> | |
* </ul> | |
* <p> | |
* Why is this an abstract class instead of an interface? | |
* <p> | |
* <ol> | |
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li> | |
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li> | |
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the | |
* functionality.</li> | |
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li> | |
* </ol> | |
*/ | |
public abstract class Scheduler { | |
/** | |
* Schedules a cancelable action to be executed. | |
* | |
* @param state | |
* State to pass into the action. | |
* @param action | |
* Action to schedule. | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public abstract <T> Subscription schedule(T state, Func2<? super Inner, ? super T, ? extends Subscription> action); | |
public <T> Subscription schedule(final Action1<Inner> action) { | |
return schedule(null, new Func2<Inner, T, Subscription>() { | |
@Override | |
public Subscription call(Inner innerScheduler, T state) { | |
action.call(innerScheduler); | |
return Subscriptions.empty(); | |
} | |
}); | |
} | |
public <T> Subscription schedule(final Func1<? super Inner, ? extends Subscription> action) { | |
return schedule(null, new Func2<Inner, T, Subscription>() { | |
@Override | |
public Subscription call(Inner innerScheduler, T state) { | |
return action.call(innerScheduler); | |
} | |
}); | |
} | |
public <T> Subscription schedule(final Action0 action) { | |
return schedule(null, new Func2<Inner, T, Subscription>() { | |
@Override | |
public Subscription call(Inner innerScheduler, T state) { | |
return innerScheduler.schedule(action); | |
} | |
}); | |
} | |
public <T> Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) { | |
return schedule(null, new Func2<Inner, T, Subscription>() { | |
@Override | |
public Subscription call(Inner innerScheduler, T state) { | |
return innerScheduler.schedule(action, delayTime, unit); | |
} | |
}); | |
} | |
public <T> Subscription schedulePeriodically(final Action0 action, final long initialDelay, final long period, final TimeUnit unit) { | |
return schedule(null, new Func2<Inner, T, Subscription>() { | |
@Override | |
public Subscription call(Inner innerScheduler, T state) { | |
return innerScheduler.schedulePeriodically(action, initialDelay, period, unit); | |
} | |
}); | |
} | |
public abstract class Inner { | |
/** | |
* Schedules a cancelable action to be executed in delayTime. | |
* | |
* @param delayTime | |
* Time the action is to be delayed before executing. | |
* @param unit | |
* Time unit of the delay time. | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public abstract Subscription schedule(Func0<? extends Subscription> action, long delayTime, TimeUnit unit); | |
/** | |
* Schedules a cancelable action to be executed in delayTime. | |
* | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public abstract Subscription schedule(Func0<? extends Subscription> action); | |
/** | |
* Schedules a cancelable action to be executed periodically. | |
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing | |
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. | |
* | |
* @param state | |
* State to pass into the action. | |
* @param action | |
* The action to execute periodically. | |
* @param initialDelay | |
* Time to wait before executing the action for the first time. | |
* @param period | |
* The time interval to wait each time in between executing the action. | |
* @param unit | |
* The time unit the interval above is given in. | |
* @return A subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedulePeriodically(final Func0<? extends Subscription> action, long initialDelay, long period, TimeUnit unit) { | |
final long periodInNanos = unit.toNanos(period); | |
final AtomicBoolean complete = new AtomicBoolean(); | |
final Func0<Subscription> recursiveAction = new Func0<Subscription>() { | |
@Override | |
public Subscription call() { | |
if (!complete.get()) { | |
long startedAt = now(); | |
final Subscription sub1 = action.call(); | |
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); | |
final Subscription sub2 = schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); | |
return Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
sub1.unsubscribe(); | |
sub2.unsubscribe(); | |
} | |
}); | |
} | |
return Subscriptions.empty(); | |
} | |
}; | |
final Subscription sub = schedule(recursiveAction, initialDelay, unit); | |
return Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
complete.set(true); | |
sub.unsubscribe(); | |
} | |
}); | |
} | |
/** | |
* Schedules a cancelable action to be executed at dueTime. | |
* | |
* @param state | |
* State to pass into the action. | |
* @param action | |
* Action to schedule. | |
* @param dueTime | |
* Time the action is to be executed. If in the past it will be executed immediately. | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedule(Func0<? extends Subscription> action, Date dueTime) { | |
long scheduledTime = dueTime.getTime(); | |
long timeInFuture = scheduledTime - now(); | |
if (timeInFuture <= 0) { | |
return schedule(action); | |
} else { | |
return schedule(action, timeInFuture, TimeUnit.MILLISECONDS); | |
} | |
} | |
/** | |
* Schedules an action and receives back an action for recursive execution. | |
* | |
* @param action | |
* action | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedule(final Action1<Action0> action) { | |
final CompositeSubscription parentSubscription = new CompositeSubscription(); | |
final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); | |
parentSubscription.add(childSubscription); | |
final Func0<Subscription> f = new Func0<Subscription>() { | |
@Override | |
public Subscription call() { | |
action.call(new Action0() { | |
@Override | |
public void call() { | |
if (!parentSubscription.isUnsubscribed()) { | |
childSubscription.setSubscription(schedule(action)); | |
} | |
} | |
}); | |
return childSubscription; | |
} | |
}; | |
parentSubscription.add(schedule(f)); | |
return parentSubscription; | |
} | |
/** | |
* Schedules an action to be executed. | |
* | |
* @param action | |
* action | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedule(final Action0 action) { | |
return schedule(new Func0<Subscription>() { | |
@Override | |
public Subscription call() { | |
action.call(); | |
return Subscriptions.empty(); | |
} | |
}); | |
} | |
/** | |
* Schedules an action to be executed in delayTime. | |
* | |
* @param action | |
* action | |
* @return a subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { | |
return schedule(new Func0<Subscription>() { | |
@Override | |
public Subscription call() { | |
action.call(); | |
return Subscriptions.empty(); | |
} | |
}, delayTime, unit); | |
} | |
/** | |
* Schedules an action to be executed periodically. | |
* | |
* @param action | |
* The action to execute periodically. | |
* @param initialDelay | |
* Time to wait before executing the action for the first time. | |
* @param period | |
* The time interval to wait each time in between executing the action. | |
* @param unit | |
* The time unit the interval above is given in. | |
* @return A subscription to be able to unsubscribe from action. | |
*/ | |
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { | |
return schedulePeriodically(new Func0<Subscription>() { | |
@Override | |
public Subscription call() { | |
action.call(); | |
return Subscriptions.empty(); | |
} | |
}, initialDelay, period, unit); | |
} | |
} | |
/** | |
* Parallelism available to a Scheduler. | |
* <p> | |
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. | |
* | |
* @return the scheduler's available degree of parallelism. | |
*/ | |
public int degreeOfParallelism() { | |
return Runtime.getRuntime().availableProcessors(); | |
} | |
/** | |
* @return the scheduler's notion of current absolute time in milliseconds. | |
*/ | |
public long now() { | |
return System.currentTimeMillis(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment