Created
October 19, 2016 08:59
-
-
Save kaedea/0a25ef586301da3bd26e8de1ca4de053 to your computer and use it in GitHub Desktop.
Task dispatcher.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Task dispatcher interface. | |
* Using {@linkplain Maker} to build instance in a simple way. | |
* | |
* @author kaede | |
* @version date 16/10/19 | |
*/ | |
public interface Dispatcher { | |
String TAG = "task.dispatcher"; | |
/** | |
* Attach an existing scheduler to dispatcher, so that we don't need to create an new one. | |
*/ | |
Dispatcher attach(Handler scheduler); | |
/** | |
* Start the dispatcher before it can work. | |
*/ | |
void start(); | |
/** | |
* Whether or not the dispatcher is working. | |
*/ | |
boolean isRunning(); | |
/** | |
* Add an task to the dispatcher. | |
* Note that you should call {@linkplain #start()} before this. | |
*/ | |
void add(Runnable runnable); | |
/** | |
* Schedule an task to the dispatcher. | |
* Note that you should call {@linkplain #start()} before this. | |
*/ | |
void schedule(Runnable runnable, long millis); | |
/** | |
* When a task is finished, it will call this method. | |
* Note that you should not call this method directly. | |
*/ | |
void finish(); | |
/** | |
* Terminate the dispatcher. | |
*/ | |
void shutdown(); | |
/** | |
* Maker utility for {@linkplain Dispatcher} | |
*/ | |
class Maker { | |
public static ExecutorDispatcher newExecutorDispatcher(int threadPoolSize) { | |
return new ExecutorDispatcher(threadPoolSize); | |
} | |
public static ExecutorDispatcher newExecutorDispatcher(int threadPoolSize, int capacity) { | |
return new ExecutorDispatcher(threadPoolSize, capacity); | |
} | |
public static ThreadDispatcher newThreadDispatcher(int threadPoolSize) { | |
return new ThreadDispatcher(threadPoolSize); | |
} | |
public static ThreadDispatcher newThreadDispatcher(int threadPoolSize, int capacity) { | |
return new ThreadDispatcher(threadPoolSize, capacity); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @author kaede | |
* @version date 16/10/19 | |
*/ | |
public class DispatcherApiTest extends InstrumentationTestCase { | |
private Context mContext; | |
@Override | |
protected void setUp() throws Exception { | |
super.setUp(); | |
mContext = getInstrumentation().getTargetContext(); | |
} | |
public void testExecutor() throws InterruptedException { | |
int size = 20; | |
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size); | |
} | |
public void testExecutorSchedule() throws InterruptedException { | |
int size = 20; | |
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.schedule(new TestTask(i, count), 3000); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size/2); | |
Thread.sleep(3000); | |
assertTrue(count.get() == size); | |
} | |
public void testExecutorWithCustomExecutor() throws InterruptedException { | |
int size = 20; | |
final int[] threadSize = {0}; | |
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size); | |
dispatcher.attach(Executors.newCachedThreadPool(new ThreadFactory() { | |
@Override | |
public Thread newThread(Runnable r) { | |
threadSize[0]++; | |
Logger.d("[CustomDispatcher] create thread"); | |
return new Thread(r); | |
} | |
})); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size); | |
assertTrue(threadSize[0] == size); | |
} | |
public void testExecutorWithCustomScheduler() throws InterruptedException { | |
int size = 20; | |
final int[] scheduleSize = {0}; | |
ExecutorDispatcher dispatcher = Dispatcher.Maker.newExecutorDispatcher(size); | |
HandlerThread handlerThread = new HandlerThread("worker-handler"); | |
handlerThread.start(); | |
Handler scheduler = new Handler(handlerThread.getLooper()) { | |
@Override | |
public void dispatchMessage(Message msg) { | |
scheduleSize[0]++; | |
super.handleMessage(msg); | |
super.dispatchMessage(msg); | |
} | |
}; | |
dispatcher.attach(scheduler); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.schedule(new TestTask(i, count), 3000); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size/2); | |
assertTrue(scheduleSize[0] == 0); | |
Thread.sleep(3000); | |
assertTrue(count.get() == size); | |
assertTrue(scheduleSize[0] == size/2); | |
} | |
public void testThread() throws InterruptedException { | |
int size = 20; | |
ThreadDispatcher dispatcher = Dispatcher.Maker.newThreadDispatcher(size); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size); | |
} | |
public void testThreadSchedule() throws InterruptedException { | |
int size = 20; | |
ThreadDispatcher dispatcher = Dispatcher.Maker.newThreadDispatcher(size); | |
dispatcher.start(); | |
AtomicInteger count = new AtomicInteger(0); | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.add(new TestTask(i, count)); | |
} | |
for (int i = 0; i < size/2; i++) { | |
dispatcher.schedule(new TestTask(i, count), 3000); | |
} | |
Thread.sleep(2000); | |
assertTrue(count.get() == size/2); | |
Thread.sleep(5000); | |
assertTrue(count.get() == size); | |
} | |
private class TestTask implements Runnable, Comparable<TestTask> { | |
final int mId; | |
final AtomicInteger mCount; | |
boolean mFinished = false; | |
public TestTask(int id, AtomicInteger count) { | |
mId = id; | |
mCount = count; | |
} | |
@Override | |
public void run() { | |
Log.d(Dispatcher.TAG, "task started, id = " + mId); | |
try { | |
Thread.sleep(1000); | |
mCount.incrementAndGet(); | |
Log.d(Dispatcher.TAG, "task finished, id = " + mId); | |
mFinished = true; | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
@Override | |
public int compareTo(TestTask another) { | |
return mId - another.mId; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Task dispatcher impl with ExecutorService. | |
* <p> | |
* Worker dispatcher with an {@link ExecutorService}. Note that if the executor's work | |
* queue is bounded, the excessive task will be added to the pending waiting for a working | |
* task to be finished. | |
* <p> | |
* Use {@link #attach(ExecutorService)} to set an existing executor, but if you do not use embed | |
* executor, you have to deal with the bounded issue. | |
* <p> | |
* Use {@link #attach(Handler)} to set an existing handler, which is used to schedule task | |
* with the executor. | |
* | |
* @author Kaede | |
* @version 2016-10-18 | |
*/ | |
public class ExecutorDispatcher implements Dispatcher, ThreadFactory, RejectedExecutionHandler { | |
private final int mCorePoolSize; | |
private final int mMaximumPoolSize; | |
private final long mKeepAliveTime; | |
private final PriorityBlockingQueue<Runnable> mWorkQueue; | |
private final LinkedBlockingQueue<Runnable> mPendingQueue; | |
private final AtomicInteger mCount = new AtomicInteger(1); | |
private Handler mScheduler; | |
private ExecutorService mExecutor; | |
public ExecutorDispatcher(int threadPoolSize) { | |
mCorePoolSize = threadPoolSize; | |
mMaximumPoolSize = threadPoolSize * 2; | |
// time to keep thread when it's idle. 30s | |
mKeepAliveTime = 30L; | |
mWorkQueue = new PriorityBlockingQueue<>(); | |
mPendingQueue = new LinkedBlockingQueue<>(); | |
} | |
public ExecutorDispatcher(int threadPoolSize, int capacity) { | |
mCorePoolSize = threadPoolSize; | |
mMaximumPoolSize = threadPoolSize * 2; | |
mKeepAliveTime = 30L; | |
mWorkQueue = new PriorityBlockingQueue<>(capacity); | |
mPendingQueue = new LinkedBlockingQueue<>(); | |
} | |
@Override | |
public ExecutorDispatcher attach(Handler scheduler) { | |
if (mScheduler == null) { | |
mScheduler = scheduler; | |
} else { | |
if (BuildConfig.DEBUG) { | |
Log.w(TAG, "scheduler has been initialized once."); | |
} | |
} | |
return this; | |
} | |
public ExecutorDispatcher attach(ExecutorService executor) { | |
if (mExecutor == null) { | |
mExecutor = executor; | |
} else { | |
if (BuildConfig.DEBUG) { | |
Log.w(TAG, "executor has been initialized once."); | |
} | |
} | |
return this; | |
} | |
public ExecutorService getExecutor() { | |
return mExecutor; | |
} | |
@Override | |
public void start() { | |
if (mExecutor == null) { | |
mExecutor = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, | |
TimeUnit.SECONDS, mWorkQueue, this, this); | |
((ThreadPoolExecutor) mExecutor).allowCoreThreadTimeOut(true); | |
} else { | |
if (BuildConfig.DEBUG) { | |
Log.w(TAG, "dispatcher has already started once."); | |
} | |
} | |
} | |
@Override | |
public boolean isRunning() { | |
return mExecutor != null && !mExecutor.isShutdown(); | |
} | |
@Override | |
public void add(Runnable runnable) { | |
if (mExecutor == null) { | |
throw new IllegalStateException("pls call #start to initialize."); | |
} | |
mExecutor.execute(runnable); | |
} | |
@Override | |
public void schedule(final Runnable runnable, long millis) { | |
if (mExecutor == null) { | |
throw new IllegalStateException("pls call #start to initialize."); | |
} | |
if (mScheduler == null) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "create thread-executor-scheduler"); | |
} | |
HandlerThread thread = new HandlerThread("thread-executor-scheduler"); | |
thread.start(); | |
mScheduler = new Handler(thread.getLooper()); | |
} | |
if (millis < 0) millis = 0; | |
mScheduler.postDelayed(new Runnable() { | |
@Override | |
public void run() { | |
if (BuildConfig.DEBUG) Log.d(TAG, "execute task"); | |
mExecutor.execute(runnable); | |
} | |
}, millis); | |
} | |
@Override | |
public void finish() { | |
// Poll from pending queue when a task is finished. | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "task finish, check pending queue"); | |
} | |
if (mPendingQueue.size() > 0) { | |
Runnable poll = mPendingQueue.poll(); | |
if (poll != null) { | |
add(poll); | |
} | |
} | |
} | |
@Override | |
public void shutdown() { | |
if (mExecutor != null) { | |
mExecutor.shutdown(); | |
} | |
} | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread thread = new Thread(r, "ExecutorDispatcher #" + mCount.getAndIncrement()); | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "executor new thread : " + thread.getName()); | |
} | |
return thread; | |
} | |
@Override | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | |
// Offer pending queue when the executor's working queue is bounded. | |
// Note that work queue (PriorityBlockingQueue is an unbounded queue) capacity is | |
// INTEGER_MAX, therefore it will not be easily bounded and this is only a protection. | |
if (BuildConfig.DEBUG) Log.d(TAG, "bounded work queue, pend task"); | |
mPendingQueue.offer(r); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Task dispatcher impl with Thread. | |
* <p> | |
* Use {@link #attach(Handler)} to set an existing handler, which is used to schedule task | |
* with the executor. | |
* @author kaede | |
* @version date 16/10/19 | |
*/ | |
public class ThreadDispatcher implements Dispatcher { | |
private final int mDispatcherSize; | |
private final AtomicInteger mCount = new AtomicInteger(1); | |
private final PriorityBlockingQueue<Runnable> mWorkQueue; | |
private Handler mScheduler; | |
private DispatcherThread[] mDispatchers; | |
public ThreadDispatcher(int threadPoolSize) { | |
mDispatcherSize = threadPoolSize; | |
mWorkQueue = new PriorityBlockingQueue<>(); | |
} | |
public ThreadDispatcher(int threadPoolSize, int capacity) { | |
mDispatcherSize = threadPoolSize; | |
mWorkQueue = new PriorityBlockingQueue<>(capacity); | |
} | |
@Override | |
public ThreadDispatcher attach(Handler scheduler) { | |
if (mScheduler == null) { | |
mScheduler = scheduler; | |
} else { | |
if (BuildConfig.DEBUG) { | |
Log.w(TAG, "scheduler has been initialized once."); | |
} | |
} | |
return this; | |
} | |
@Override | |
public void start() { | |
if (mDispatchers == null || mDispatchers.length == 0) { | |
mDispatchers = new DispatcherThread[mDispatcherSize]; | |
for (int i = 0; i < mDispatchers.length; i++) { | |
mDispatchers[i] = new DispatcherThread(); | |
mDispatchers[i].start(); | |
} | |
} else { | |
if (BuildConfig.DEBUG) { | |
Log.w(TAG, "dispatcher has already started once."); | |
} | |
} | |
} | |
@Override | |
public boolean isRunning() { | |
return mDispatchers != null && mDispatchers.length > 0; | |
} | |
@Override | |
public void add(Runnable runnable) { | |
if (mDispatchers == null) { | |
throw new IllegalStateException("pls call #start to initialize."); | |
} | |
mWorkQueue.offer(runnable); | |
} | |
@Override | |
public void schedule(final Runnable runnable, long millis) { | |
if (mDispatchers == null) { | |
throw new IllegalStateException("pls call #start to initialize."); | |
} | |
if (mScheduler == null) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "create thread-executor-scheduler"); | |
} | |
HandlerThread thread = new HandlerThread("thread-executor-scheduler"); | |
thread.start(); | |
mScheduler = new Handler(thread.getLooper()); | |
} | |
if (millis < 0) millis = 0; | |
mScheduler.postDelayed(new Runnable() { | |
@Override | |
public void run() { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "execute task"); | |
} | |
mWorkQueue.offer(runnable); | |
} | |
}, millis); | |
} | |
@Override | |
public void finish() { | |
} | |
@Override | |
public void shutdown() { | |
if (mDispatchers != null) { | |
for (int i = 0; i < mDispatchers.length; i++) { | |
mDispatchers[i].quit(); | |
mDispatchers[i] = null; | |
} | |
mDispatchers = null; | |
} | |
} | |
private class DispatcherThread extends Thread { | |
public DispatcherThread() { | |
setName("ThreadDispatcher #" + mCount.getAndIncrement()); | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "create ThreadDispatcher #" + mCount.getAndIncrement()); | |
} | |
} | |
@Override | |
public void run() { | |
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); | |
while (true) { | |
try { | |
Runnable runnable = mWorkQueue.take(); | |
runnable.run(); | |
if (isInterrupted()) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "Dispatcher is interrupted."); | |
} | |
break; | |
} | |
} catch (InterruptedException e) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "BlockingQueue is interrupted."); | |
} | |
} | |
} | |
} | |
public void quit() { | |
interrupt(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment