Last active
March 16, 2017 21:26
-
-
Save haridaniel/a9fcd0791602b1d27336346c6a93458e to your computer and use it in GitHub Desktop.
PriorityExecutor that preserves submit order within same priority levels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hu.daniel.hari.exercises.threading.executors_priority._4; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import hu.daniel.hari.exercises.threading.executors_priority._4.PriorityExecutors.PriorityCallable; | |
import hu.daniel.hari.exercises.threading.executors_priority._4.PriorityExecutors.PriorityRunnable; | |
public class Main { | |
public static void main(String[] args) throws InterruptedException, ExecutionException { | |
ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); | |
//Priority=0 | |
executorService.submit(newCallable("A1", 200)); //Defaults to priority=0 | |
executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 | |
executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); | |
executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); | |
executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); | |
executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); | |
executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); | |
executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); | |
//Priority=1 | |
executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); | |
executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); | |
executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); | |
executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); | |
executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); | |
executorService.shutdown(); | |
} | |
private static Runnable newRunnable(String name, int delay) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
System.out.println(name); | |
sleep(delay); | |
// if (true) | |
// throw new RuntimeException("shit"); | |
} | |
}; | |
} | |
private static Callable<Integer> newCallable(String name, int delay) { | |
return new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
System.out.println(name); | |
sleep(delay); | |
// if (true) | |
// throw new RuntimeException("shit2"); | |
return 10; | |
} | |
}; | |
} | |
private static void sleep(long millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new RuntimeException(e); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hu.daniel.hari.exercises.threading.executors_priority._4; | |
import java.util.Comparator; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.PriorityBlockingQueue; | |
import java.util.concurrent.RunnableFuture; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
/** | |
* Task execution order is based on: | |
* - Priority, | |
* - Submit order (within same priority) | |
* | |
* https://gist.github.com/haridaniel/a9fcd0791602b1d27336346c6a93458e | |
* http://stackoverflow.com/a/42831172/1386911 | |
* | |
* @author hari.daniel | |
*/ | |
public class PriorityExecutors { | |
public static ExecutorService newFixedThreadPool(int nThreads) { | |
return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); | |
} | |
private static class PriorityExecutor extends ThreadPoolExecutor { | |
private static final int DEFAULT_PRIORITY = 0; | |
private static AtomicLong instanceCounter = new AtomicLong(); | |
@SuppressWarnings({"unchecked"}) | |
public PriorityExecutor(int corePoolSize, int maximumPoolSize, | |
long keepAliveTime, TimeUnit unit) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, | |
ComparableTask.comparatorByPriorityAndSequentialOrder())); | |
} | |
@Override | |
public void execute(Runnable command) { | |
// If this is ugly then delegator pattern needed | |
if (command instanceof ComparableTask) //Already wrapped | |
super.execute(command); | |
else { | |
super.execute(newComparableRunnableFor(command)); | |
} | |
} | |
private Runnable newComparableRunnableFor(Runnable runnable) { | |
return new ComparableRunnable(ensurePriorityRunnable(runnable)); | |
} | |
@Override | |
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { | |
return new ComparableFutureTask<>(ensurePriorityCallable(callable)); | |
} | |
@Override | |
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { | |
return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); | |
} | |
private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) { | |
return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable | |
: PriorityCallable.of(callable, DEFAULT_PRIORITY); | |
} | |
private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { | |
return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable | |
: PriorityRunnable.of(runnable, DEFAULT_PRIORITY); | |
} | |
private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { | |
private Long sequentialOrder = instanceCounter.getAndIncrement(); | |
private HasPriority hasPriority; | |
public ComparableFutureTask(PriorityCallable<T> priorityCallable) { | |
super(priorityCallable); | |
this.hasPriority = priorityCallable; | |
} | |
public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { | |
super(priorityRunnable, result); | |
this.hasPriority = priorityRunnable; | |
} | |
@Override | |
public long getInstanceCount() { | |
return sequentialOrder; | |
} | |
@Override | |
public int getPriority() { | |
return hasPriority.getPriority(); | |
} | |
} | |
private static class ComparableRunnable implements Runnable, ComparableTask { | |
private Long instanceCount = instanceCounter.getAndIncrement(); | |
private HasPriority hasPriority; | |
private Runnable runnable; | |
public ComparableRunnable(PriorityRunnable priorityRunnable) { | |
this.runnable = priorityRunnable; | |
this.hasPriority = priorityRunnable; | |
} | |
@Override | |
public void run() { | |
runnable.run(); | |
} | |
@Override | |
public int getPriority() { | |
return hasPriority.getPriority(); | |
} | |
@Override | |
public long getInstanceCount() { | |
return instanceCount; | |
} | |
} | |
private interface ComparableTask extends Runnable { | |
int getPriority(); | |
long getInstanceCount(); | |
public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { | |
return (o1, o2) -> { | |
int priorityResult = o2.getPriority() - o1.getPriority(); | |
return priorityResult != 0 ? priorityResult | |
: (int) (o1.getInstanceCount() - o2.getInstanceCount()); | |
}; | |
} | |
} | |
} | |
private static interface HasPriority { | |
int getPriority(); | |
} | |
public interface PriorityCallable<V> extends Callable<V>, HasPriority { | |
public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) { | |
return new PriorityCallable<V>() { | |
@Override | |
public V call() throws Exception { | |
return callable.call(); | |
} | |
@Override | |
public int getPriority() { | |
return priority; | |
} | |
}; | |
} | |
} | |
public interface PriorityRunnable extends Runnable, HasPriority { | |
public static PriorityRunnable of(Runnable runnable, int priority) { | |
return new PriorityRunnable() { | |
@Override | |
public void run() { | |
runnable.run(); | |
} | |
@Override | |
public int getPriority() { | |
return priority; | |
} | |
}; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment