Created
December 23, 2012 17:28
-
-
Save hugozhu/4364650 to your computer and use it in GitHub Desktop.
带有优先级和超时设置的任务执行者,可以实现下列逻辑:
并行处理一堆任务,返回在指定时间内完成的任务结果
并行处理一堆任务,返回在优先级最高的(第一个)任务完成或超时之前完成的任务结果
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PriorityTaskExecutor extends ThreadPoolExecutor { | |
public PriorityTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | |
} | |
public <T> List<Future<T>> invokeAllWithPriority(Collection<? extends Callable<T>> tasks, | |
long timeout, TimeUnit unit) | |
throws InterruptedException { | |
if (tasks == null || unit == null) | |
throw new NullPointerException(); | |
long nanos = unit.toNanos(timeout); | |
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); | |
boolean done = false; | |
try { | |
for (Callable<T> t : tasks) | |
futures.add(newTaskFor(t)); | |
long lastTime = System.nanoTime(); | |
// Interleave time checks and calls to execute in case | |
// executor doesn't have any/much parallelism. | |
Iterator<Future<T>> it = futures.iterator(); | |
while (it.hasNext()) { | |
execute((Runnable)(it.next())); | |
long now = System.nanoTime(); | |
nanos -= now - lastTime; | |
lastTime = now; | |
if (nanos <= 0) | |
return futures; | |
} | |
boolean finishedMasterTask = false; | |
for (Future<T> f : futures) { | |
if (!f.isDone()) { | |
if (finishedMasterTask) { | |
f.cancel(true); | |
continue; | |
} | |
if (nanos <= 0) | |
return futures; | |
try { | |
f.get(nanos, TimeUnit.NANOSECONDS); | |
} catch (CancellationException ignore) { | |
} catch (ExecutionException ignore) { | |
} catch (TimeoutException toe) { | |
return futures; | |
} | |
long now = System.nanoTime(); | |
nanos -= now - lastTime; | |
lastTime = now; | |
} | |
finishedMasterTask = true; | |
} | |
done = true; | |
return futures; | |
} finally { | |
if (!done) | |
for (Future<T> f : futures) | |
f.cancel(true); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
用的时候 是通过 tasks 来保证优先级么
那 workQueue 是不是会在全局上打乱 这种优先级啊