Last active
September 15, 2017 13:23
-
-
Save basinilya/2cf28710343fbdd838b8e1ce447b12b5 to your computer and use it in GitHub Desktop.
UnfairQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.foo.unfairqueue; | |
import java.util.Collection; | |
import java.util.Iterator; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
/** see ForwardingBlockingQueue from guava */ | |
public class ForwardingBlockingQueue<E> implements BlockingQueue<E> { | |
private final BlockingQueue<E> deleg; | |
ForwardingBlockingQueue(final BlockingQueue<E> deleg) { | |
this.deleg = deleg; | |
} | |
@Override | |
public boolean add(final E e) { | |
return deleg.add(e); | |
} | |
@Override | |
public boolean addAll(final Collection<? extends E> c) { | |
return deleg.addAll(c); | |
} | |
@Override | |
public void clear() { | |
deleg.clear(); | |
} | |
@Override | |
public boolean contains(final Object o) { | |
return deleg.contains(o); | |
} | |
@Override | |
public boolean containsAll(final Collection<?> c) { | |
return deleg.containsAll(c); | |
} | |
@Override | |
public E element() { | |
return deleg.element(); | |
} | |
@Override | |
public boolean isEmpty() { | |
return deleg.isEmpty(); | |
} | |
@Override | |
public Iterator<E> iterator() { | |
return deleg.iterator(); | |
} | |
@Override | |
public boolean offer(final E e) { | |
return deleg.offer(e); | |
} | |
@Override | |
public E peek() { | |
return deleg.peek(); | |
} | |
@Override | |
public E poll() { | |
return deleg.poll(); | |
} | |
@Override | |
public void put(final E e) throws InterruptedException { | |
deleg.put(e); | |
} | |
@Override | |
public E remove() { | |
return deleg.remove(); | |
} | |
@Override | |
public boolean remove(final Object o) { | |
return deleg.remove(o); | |
} | |
@Override | |
public boolean removeAll(final Collection<?> c) { | |
return deleg.removeAll(c); | |
} | |
@Override | |
public boolean retainAll(final Collection<?> c) { | |
return deleg.retainAll(c); | |
} | |
@Override | |
public int size() { | |
return deleg.size(); | |
} | |
@Override | |
public Object[] toArray() { | |
return deleg.toArray(); | |
} | |
@Override | |
public <T> T[] toArray(final T[] a) { | |
return deleg.toArray(a); | |
} | |
@Override | |
public String toString() { | |
return deleg.toString(); | |
} | |
@Override | |
public boolean offer(final E e, final long timeout, final TimeUnit unit) | |
throws InterruptedException { | |
return deleg.offer(e, timeout, unit); | |
} | |
@Override | |
public E take() throws InterruptedException { | |
return deleg.take(); | |
} | |
@Override | |
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { | |
return deleg.poll(timeout, unit); | |
} | |
@Override | |
public int remainingCapacity() { | |
return deleg.remainingCapacity(); | |
} | |
@Override | |
public int drainTo(final Collection<? super E> c) { | |
return deleg.drainTo(c); | |
} | |
@Override | |
public int drainTo(final Collection<? super E> c, final int maxElements) { | |
return deleg.drainTo(c, maxElements); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.foo.unfairqueue; | |
import java.lang.reflect.Field; | |
import java.util.Iterator; | |
import java.util.WeakHashMap; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.FutureTask; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Unfortunately, ThreadPoolExecutor only grows beyond its core pool size, if its queue rejects a | |
* task. This blocking queue is aware of its thread pool executor. It rejects new tasks, if all | |
* threads of the pool are busy and the pool is allowed to grow. | |
*/ | |
public class IdlePoolFillingQueue extends LinkedBlockingQueue<Runnable> { | |
private static final long serialVersionUID = 1L; | |
IdlePoolFillingQueue(final int realMaximumPoolSize) { | |
this.realMaximumPoolSize = realMaximumPoolSize; | |
} | |
private final int realMaximumPoolSize; | |
private volatile ThreadPoolExecutor tpe; | |
void setExecutor(final ThreadPoolExecutor tpe) { | |
this.tpe = tpe; | |
} | |
private final Object offerLock = new Object(); | |
@Override | |
public boolean offer(final Runnable elem) { | |
Task task = null; | |
if (false) { | |
try { | |
final FutureTask<?> fut = (FutureTask<?>) elem; | |
final Field fCallable = fut.getClass().getDeclaredField("callable"); | |
fCallable.setAccessible(true); | |
task = (Task) fCallable.get(fut); | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
// Race conditions are possible, because we cannot lock tpe. | |
// In this case we aim to reduce chance to create a thread | |
// beyond max pool size. | |
// The pool better look less busy. | |
synchronized (offerLock) { | |
// Total number of tasks must look smaller. | |
// When executor takes a task from the queue, | |
// the queue size decrements, activeCount increments | |
// in this very order: | |
final int activeCount = tpe.getActiveCount(); | |
final int scheduled = size(); | |
final int activeOrScheduled = scheduled + activeCount; | |
reviseRejectingThreads(); | |
// Total number of pool threads must look bigger. | |
// When a worker is created, | |
// rejectingThreads size decrements, pool size increments | |
// in this very order: | |
final int nRejectingThreads = rejectingThreads.size(); | |
final int currentNThreads = tpe.getPoolSize(); | |
final int totalThreads = currentNThreads + nRejectingThreads; | |
// If we have too many threads, don't create new thread | |
if (totalThreads >= realMaximumPoolSize || totalThreads > activeOrScheduled) { | |
return super.offer(elem); | |
} | |
if (false) { | |
System.out.println("task: " + task + " " + "activeCount=" + activeCount | |
+ " scheduled=" + scheduled + " currentNThreads=" + currentNThreads | |
+ " nRejectingThreads=" + nRejectingThreads); | |
} | |
rejectingThreads.put(Thread.currentThread(), null); | |
} | |
if (false) { | |
try { | |
if (task.i2 == 0) { | |
try { | |
dbg.countDown(); | |
dbg.await(1, TimeUnit.SECONDS); | |
} catch (final InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} catch (final Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
return false; | |
} | |
private void reviseRejectingThreads() { | |
final Thread currentThread = Thread.currentThread(); | |
final Iterator<Thread> it = rejectingThreads.keySet().iterator(); | |
thread_loop: while (it.hasNext()) { | |
final Thread t = it.next(); | |
if (t != currentThread) { | |
try { | |
for (final StackTraceElement st : t.getStackTrace()) { | |
if (TPE_CL_NAME.equals(st.getClassName()) | |
&& "execute".equals(st.getMethodName())) { | |
continue thread_loop; | |
} | |
} | |
} catch (final SecurityException e) { | |
continue thread_loop; | |
} | |
} | |
it.remove(); | |
} | |
} | |
// If the pool is almost full and two threads simultaneously reject | |
// a new queue element, the pool will grow twice and exceed | |
// realMaximumPoolSize. | |
// This map contains the threads that can still create their own | |
// workers. | |
private final WeakHashMap<Thread, Void> rejectingThreads = new WeakHashMap<Thread, Void>(); | |
private static final String TPE_CL_NAME = ThreadPoolExecutor.class.getName(); | |
private final CountDownLatch dbg = new CountDownLatch(5); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.foo.unfairqueue; | |
import java.util.SortedMap; | |
import java.util.TreeMap; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* A blocking queue with predictable selection of a thread when multiple threads are waiting for an | |
* element | |
*/ | |
public class UnfairQueue<E> extends ForwardingBlockingQueue<E> { | |
/** | |
* Elect a thread to consume one available element. This implementation elects the thread with | |
* lowest id. | |
* | |
* @param waitingThreadsById candidates. Always contains at least one thread | |
* @return thread id | |
*/ | |
protected long electConsumer( | |
final SortedMap<? extends Long, ? extends Thread> waitingThreadsById) { | |
return waitingThreadsById.firstKey(); | |
} | |
UnfairQueue(final BlockingQueue<E> deleg) { | |
super(deleg); | |
} | |
@Override | |
public E take() throws InterruptedException { | |
return poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS); | |
} | |
/** | |
* The first thread to call this method becomes a leader. Only a leader can poll the real queue | |
* with a timeout. Other threads wait for the leader to obtain an element and then each calls | |
* {@link #electConsumer(TreeMap)} to see whether the current thread is the one to return it. | |
* After that another thread becomes a leader. | |
*/ | |
@Override | |
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { | |
boolean needAfterPoll = true; | |
final Thread currentThread = Thread.currentThread(); | |
final long deadline = System.nanoTime() + unit.toNanos(timeout); | |
try { | |
E newElem = null; | |
long delay; | |
for (;;) { | |
synchronized (consumeLock) { | |
consumerThreads.put(currentThread.getId(), currentThread); | |
for (;;) { | |
if (leaderConsumer == null) { | |
leaderConsumer = currentThread; | |
} | |
if (lastElem == null && newElem != null) { | |
lastElem = newElem; | |
newElem = null; | |
consumeLock.notifyAll(); | |
} | |
delay = deadline - System.nanoTime(); | |
if (lastElem != null) { | |
final long elected = electConsumer(consumerThreads); | |
if (currentThread.getId() == elected | |
|| (leaderConsumer.getId() == elected && delay <= 0)) { | |
// if leader was polling the real queue, it could not | |
// be notified that it's a winner. And so one of the | |
// other threads has to steal the leader's element | |
final E res = lastElem; | |
lastElem = newElem; | |
afterPoll(); | |
needAfterPoll = false; | |
return res; | |
} | |
// lost election. Wait for lastElem to become null, | |
// which is guaranteed to be very soon | |
consumeLock.wait(); | |
continue; | |
} | |
// lastElem is null | |
// maybe first cycle or poll returned null | |
// leader should try at least once | |
if (leaderConsumer == currentThread || delay <= 0) { | |
// either a leader or cannot wait any longer | |
break; // poll outside of synchronized | |
} | |
// not leader and can wait for leader | |
consumeLock.wait(delay / 1000000L, (int) (delay % 1000000L)); | |
} | |
} | |
newElem = super.poll(delay, TimeUnit.NANOSECONDS); | |
if (newElem == null) { | |
return null; | |
} | |
} | |
} finally { | |
if (needAfterPoll) { | |
synchronized (consumeLock) { | |
afterPoll(); | |
} | |
} | |
} | |
} | |
private void afterPoll() { | |
final Thread currentThread = Thread.currentThread(); | |
consumerThreads.remove(currentThread.getId()); | |
if (leaderConsumer == currentThread) { | |
leaderConsumer = null; | |
} | |
consumeLock.notifyAll(); | |
} | |
private E lastElem; | |
private final TreeMap<Long, Thread> consumerThreads = new TreeMap<Long, Thread>(); | |
private final Object consumeLock = new Object(); | |
private Thread leaderConsumer; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Behaves like a fixed thread pool with unbounded queue, but can grow. | |
* Behaves like a cached thread pool, but can have finite max pool size. | |
*/ | |
package org.foo.unfairqueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ThreadFactory; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public class UnfairQueueTest { | |
public static void main(final String[] args) throws Exception { | |
final ExecutorService submitter = mkSubmitter(); | |
final ThreadPoolExecutor executor = mkExecutor(); | |
// thread alive time is 10s | |
// load all threads with tasks at start and every 12s | |
// also submit one task each second | |
for (int i = 0;; i++) { | |
int j = 0; | |
do { | |
if (false && !mostThreadsUnused(i)) { | |
break; | |
} | |
final Task task = new Task(); | |
task.i2 = i; | |
task.j2 = j; | |
submitter.submit(new Callable<Void>() { | |
@Override | |
public Void call() throws Exception { | |
try { | |
executor.submit(task); | |
} catch (final Exception e) { | |
System.out.println("" + task + " " + e.toString()); | |
System.exit(1); | |
} | |
return null; | |
} | |
}); | |
if (task.i2 == 12 && task.j2 == 5) { | |
// return; | |
} | |
} while (mostThreadsUnused(i) && ++j < (NTHREADS * 2)); | |
Thread.sleep(1000); | |
System.out.println(); | |
} | |
} | |
private static ThreadPoolExecutor mkExecutor() { | |
/* | |
* Core pool size is set to 0, otherwise TPE will grow unnecessarily at low load. Until the | |
* number of workers reaches corePoolSize, TPE creates a new worker even though there's an | |
* idle worker. allowCoreThreadTimeOut() will allow the pool to shrink, but only until a | |
* next task is submitted. | |
*/ | |
/* | |
* IdlePoolFillingQueue rejects new elements, but only if the pool is busy and can grow. | |
* This combines the features of LinkedBlockingQueue used in fixed thread pool and | |
* SynchronousQueue used in cached thread pool. | |
*/ | |
/* | |
* When an element appears in UnfairQueue, it's returned neither to a random thread, nor to | |
* the longest waiting thread. It's returned to the thread with the lowest id. Therefore, at | |
* low load one thread is constantly busy while the other threads are idle. They are able to | |
* reach their max idle time and terminate. | |
*/ | |
/* | |
* maximumPoolSize set to NTHREADS, because we trust the IdlePoolFillingQueue to regulate | |
* the number of workers. If IdlePoolFillingQueue rejects an element by mistake, TPE will | |
* try to grow beyond maximumPoolSize and will reject the task. If we can afford the pool to | |
* overgrow in rare cases, we can set maximumPoolSize to Integer.MAX_VALUE and remove some | |
* hacks from IdlePoolFillingQueue to make it faster, but less precise. | |
*/ | |
final IdlePoolFillingQueue rejecter = new IdlePoolFillingQueue(NTHREADS); | |
final BlockingQueue<Runnable> workQueue = new UnfairQueue<Runnable>(rejecter); | |
final ThreadPoolExecutor executor = | |
new ThreadPoolExecutor(0, NTHREADS, ALIVE_TIME, TimeUnit.SECONDS, workQueue); | |
rejecter.setExecutor(executor); | |
return executor; | |
} | |
private static ExecutorService mkSubmitter() { | |
final ExecutorService submitter = Executors.newCachedThreadPool(new ThreadFactory() { | |
final ThreadGroup group; | |
final AtomicInteger threadNumber = new AtomicInteger(1); | |
final String namePrefix; | |
{ | |
final SecurityManager s = System.getSecurityManager(); | |
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); | |
namePrefix = "zzz" + "-thread-"; | |
} | |
@Override | |
public Thread newThread(final Runnable r) { | |
final Thread t = | |
new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); | |
if (t.isDaemon()) { | |
t.setDaemon(false); | |
} | |
if (t.getPriority() != Thread.NORM_PRIORITY) { | |
t.setPriority(Thread.NORM_PRIORITY); | |
} | |
return t; | |
} | |
}); | |
((ThreadPoolExecutor) submitter).setKeepAliveTime(2, TimeUnit.SECONDS); | |
return submitter; | |
} | |
private static boolean mostThreadsUnused(final int i) { | |
return i % (ALIVE_TIME + 2) == 0; | |
} | |
private static final int NTHREADS = 4; | |
private static final int ALIVE_TIME = 10; | |
} | |
class Task implements Callable<Void> { | |
public int i2, j2; | |
@Override | |
public Void call() throws Exception { | |
System.out.println("" + Thread.currentThread().getName() + " " + i2 + " " + j2); | |
Thread.sleep(300); | |
return null; | |
} | |
@Override | |
public String toString() { | |
return i2 + " " + j2; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment