Forked from futtetennista/ThreadPoolIdlingResource.java
Last active
August 29, 2015 14:14
-
-
Save logcat/fed4f1f90c46fe1b7e7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2013 Google, 2014 Stefano Dacchille | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.jimdo.test; | |
import android.util.Log; | |
import com.google.android.apps.common.testing.ui.espresso.IdlingResource; | |
import java.util.concurrent.BrokenBarrierException; | |
import java.util.concurrent.CyclicBarrier; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
import static com.google.common.base.Preconditions.checkNotNull; | |
import static com.google.common.base.Preconditions.checkState; | |
/** | |
* Provides a way to monitor a Thread Pool Executor's work queue to ensure that there is no work pending | |
* or executing (and to allow notification of idleness). | |
* <p/> | |
* Register a new subclass for each Thread Pool Executor. | |
*/ | |
public abstract class ThreadPoolIdlingResource implements IdlingResource { | |
private static final String TAG = "ThreadPoolIdlingResource"; | |
private final AtomicReference<IdleMonitor> monitor = new AtomicReference<>(null); | |
private final AtomicInteger activeBarrierChecks = new AtomicInteger(0); | |
private final ThreadPoolExecutor threadPoolExecutor; | |
private ResourceCallback callback; | |
private AtomicBoolean isMonitorForIdle = new AtomicBoolean(false); | |
private final Runnable idleAction = new Runnable() { | |
@Override public void run() { | |
try { | |
isMonitorForIdle.set(false); | |
if (callback != null) callback.onTransitionToIdle(); | |
} finally { | |
cancelIdleMonitor(); | |
} | |
} | |
}; | |
protected ThreadPoolIdlingResource(ThreadPoolExecutor executor) { | |
this.threadPoolExecutor = checkNotNull(executor, | |
String.format("Trying to instantiate a \'%s\' with a null pool", getName())); | |
} | |
/** | |
* Checks if the minPoolThreads is idle at this moment. | |
* | |
* @return true if the minPoolThreads is idle, false otherwise. | |
*/ | |
@Override public boolean isIdleNow() { | |
// The minPoolThreads executor hasn't been injected yet, so we're idling | |
if (threadPoolExecutor == null) return true; | |
boolean idle; | |
if (!threadPoolExecutor.getQueue().isEmpty()) { | |
idle = false; | |
} else { | |
int activeCount = threadPoolExecutor.getActiveCount(); | |
if (0 != activeCount) { | |
if (monitor.get() == null) { | |
// if there's no idle monitor scheduled and there are still barrier | |
// checks running, they are about to exit, ignore them. | |
activeCount = activeCount - activeBarrierChecks.get(); | |
} | |
} | |
idle = 0 == activeCount; | |
} | |
if (!idle && !isMonitorForIdle.get()) { | |
isMonitorForIdle.set(true); | |
notifyWhenIdle(idleAction); | |
} | |
return idle; | |
} | |
@Override public void registerIdleTransitionCallback(final ResourceCallback resourceCallback) { | |
this.callback = resourceCallback; | |
} | |
/** | |
* Notifies caller once the minPoolThreads is idle. | |
* <p/> | |
* We check for idle-ness by submitting the max # of tasks the minPoolThreads will take and | |
* blocking | |
* the tasks until they are all executing. Then we know there are no other tasks _currently_ | |
* executing in the minPoolThreads, we look back at the work queue to see if its backed up, | |
* if it is | |
* we reenqueue ourselves and try again. | |
* <p/> | |
* Obviously this strategy will fail horribly if 2 parties are doing it at the same time, | |
* we prevent recursion here the best we can. | |
* | |
* @param idleCallback called once the minPoolThreads is idle. | |
*/ | |
void notifyWhenIdle(final Runnable idleCallback) { | |
checkNotNull(idleCallback); | |
IdleMonitor myMonitor = new IdleMonitor(idleCallback); | |
checkState(monitor.compareAndSet(null, myMonitor), "cannot monitor for idle recursively!"); | |
myMonitor.monitorForIdle(); | |
} | |
/** | |
* Stops the idle monitoring mechanism if it is in place. | |
* <p/> | |
* Note: the callback may still be invoked after this method is called. The only thing | |
* this method guarantees is that we will stop/cancel any blockign tasks we've placed | |
* on the thread minPoolThreads. | |
*/ | |
void cancelIdleMonitor() { | |
IdleMonitor myMonitor = monitor.getAndSet(null); | |
if (null != myMonitor) { | |
myMonitor.poison(); | |
} | |
} | |
private class IdleMonitor { | |
private final Runnable onIdle; | |
private final AtomicInteger barrierGeneration = new AtomicInteger(0); | |
private final CyclicBarrier barrier; | |
// written by main, read by all. | |
private volatile boolean poisoned; | |
private IdleMonitor(final Runnable onIdle) { | |
this.onIdle = checkNotNull(onIdle); | |
this.barrier = new CyclicBarrier(minPoolThreads(), | |
new Runnable() { | |
@Override | |
public void run() { | |
if (threadPoolExecutor.getQueue().isEmpty()) { | |
// no one is behind us, so the queue is idle! | |
monitor.compareAndSet(IdleMonitor.this, null); | |
onIdle.run(); | |
} else { | |
// work is waiting behind us, enqueue another block of tasks and | |
// hopefully when they're all running, the queue will be empty. | |
monitorForIdle(); | |
} | |
} | |
} | |
); | |
} | |
private int minPoolThreads() { | |
int corePoolSize = threadPoolExecutor.getCorePoolSize(); | |
return corePoolSize > 0 ? corePoolSize : Runtime.getRuntime().availableProcessors(); | |
} | |
/** | |
* Stops this monitor from using the thread minPoolThreads's resources, | |
* it may still cause the | |
* callback to be executed though. | |
*/ | |
private void poison() { | |
poisoned = true; | |
barrier.reset(); | |
} | |
private void monitorForIdle() { | |
if (poisoned) { | |
return; | |
} | |
if (isIdleNow()) { | |
monitor.compareAndSet(this, null); | |
onIdle.run(); | |
} else { | |
// Submit N tasks that will block until they are all running on the thread | |
// minPoolThreads. | |
// at this point we can check the minPoolThreads's queue and verify that there | |
// are no new | |
// tasks behind us and deem the queue idle. | |
final BarrierRestarter restarter = new BarrierRestarter(barrier, barrierGeneration); | |
for (int i = 0; i < minPoolThreads(); i++) { | |
threadPoolExecutor.execute(new Runnable() { | |
@Override | |
public void run() { | |
while (!poisoned) { | |
activeBarrierChecks.incrementAndGet(); | |
int myGeneration = barrierGeneration.get(); | |
try { | |
barrier.await(); | |
return; | |
} catch (InterruptedException ie) { | |
// sorry - I cant let you interrupt me! | |
restarter.restart(myGeneration); | |
} catch (BrokenBarrierException bbe) { | |
restarter.restart(myGeneration); | |
} finally { | |
activeBarrierChecks.decrementAndGet(); | |
} | |
} | |
} | |
}); | |
} | |
} | |
} | |
} | |
private static class BarrierRestarter { | |
private final CyclicBarrier barrier; | |
private final AtomicInteger barrierGeneration; | |
BarrierRestarter(CyclicBarrier barrier, AtomicInteger barrierGeneration) { | |
this.barrier = barrier; | |
this.barrierGeneration = barrierGeneration; | |
} | |
/** | |
* restarts the barrier. | |
* <p/> | |
* After the calling this function it is guaranteed that barrier generation has been | |
* incremented | |
* and the barrier can be awaited on again. | |
* | |
* @param fromGeneration the generation that encountered the breaking exception. | |
*/ | |
synchronized void restart(int fromGeneration) { | |
// must be synchronized. T1 could pass the if check, be suspended before calling | |
// reset, T2 | |
// sails thru - and awaits on the barrier again before T1 has awoken and reset it. | |
int nextGen = fromGeneration + 1; | |
if (barrierGeneration.compareAndSet(fromGeneration, nextGen)) { | |
// first time we've seen fromGeneration request a reset. lets reset the barrier. | |
barrier.reset(); | |
} else { | |
// some other thread has already reset the barrier - this request is a no op. | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment