Skip to content

Instantly share code, notes, and snippets.

@basinilya
Last active September 15, 2017 13:23
Show Gist options
  • Save basinilya/2cf28710343fbdd838b8e1ce447b12b5 to your computer and use it in GitHub Desktop.
Save basinilya/2cf28710343fbdd838b8e1ce447b12b5 to your computer and use it in GitHub Desktop.
UnfairQueue
package org.foo.unfairqueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/** see ForwardingBlockingQueue from guava */
public class ForwardingBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> deleg;
ForwardingBlockingQueue(final BlockingQueue<E> deleg) {
this.deleg = deleg;
}
@Override
public boolean add(final E e) {
return deleg.add(e);
}
@Override
public boolean addAll(final Collection<? extends E> c) {
return deleg.addAll(c);
}
@Override
public void clear() {
deleg.clear();
}
@Override
public boolean contains(final Object o) {
return deleg.contains(o);
}
@Override
public boolean containsAll(final Collection<?> c) {
return deleg.containsAll(c);
}
@Override
public E element() {
return deleg.element();
}
@Override
public boolean isEmpty() {
return deleg.isEmpty();
}
@Override
public Iterator<E> iterator() {
return deleg.iterator();
}
@Override
public boolean offer(final E e) {
return deleg.offer(e);
}
@Override
public E peek() {
return deleg.peek();
}
@Override
public E poll() {
return deleg.poll();
}
@Override
public void put(final E e) throws InterruptedException {
deleg.put(e);
}
@Override
public E remove() {
return deleg.remove();
}
@Override
public boolean remove(final Object o) {
return deleg.remove(o);
}
@Override
public boolean removeAll(final Collection<?> c) {
return deleg.removeAll(c);
}
@Override
public boolean retainAll(final Collection<?> c) {
return deleg.retainAll(c);
}
@Override
public int size() {
return deleg.size();
}
@Override
public Object[] toArray() {
return deleg.toArray();
}
@Override
public <T> T[] toArray(final T[] a) {
return deleg.toArray(a);
}
@Override
public String toString() {
return deleg.toString();
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit)
throws InterruptedException {
return deleg.offer(e, timeout, unit);
}
@Override
public E take() throws InterruptedException {
return deleg.take();
}
@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
return deleg.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return deleg.remainingCapacity();
}
@Override
public int drainTo(final Collection<? super E> c) {
return deleg.drainTo(c);
}
@Override
public int drainTo(final Collection<? super E> c, final int maxElements) {
return deleg.drainTo(c, maxElements);
}
}
package org.foo.unfairqueue;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.WeakHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Unfortunately, ThreadPoolExecutor only grows beyond its core pool size, if its queue rejects a
* task. This blocking queue is aware of its thread pool executor. It rejects new tasks, if all
* threads of the pool are busy and the pool is allowed to grow.
*/
public class IdlePoolFillingQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
IdlePoolFillingQueue(final int realMaximumPoolSize) {
this.realMaximumPoolSize = realMaximumPoolSize;
}
private final int realMaximumPoolSize;
private volatile ThreadPoolExecutor tpe;
void setExecutor(final ThreadPoolExecutor tpe) {
this.tpe = tpe;
}
private final Object offerLock = new Object();
@Override
public boolean offer(final Runnable elem) {
Task task = null;
if (false) {
try {
final FutureTask<?> fut = (FutureTask<?>) elem;
final Field fCallable = fut.getClass().getDeclaredField("callable");
fCallable.setAccessible(true);
task = (Task) fCallable.get(fut);
} catch (final Exception e) {
e.printStackTrace();
}
}
// Race conditions are possible, because we cannot lock tpe.
// In this case we aim to reduce chance to create a thread
// beyond max pool size.
// The pool better look less busy.
synchronized (offerLock) {
// Total number of tasks must look smaller.
// When executor takes a task from the queue,
// the queue size decrements, activeCount increments
// in this very order:
final int activeCount = tpe.getActiveCount();
final int scheduled = size();
final int activeOrScheduled = scheduled + activeCount;
reviseRejectingThreads();
// Total number of pool threads must look bigger.
// When a worker is created,
// rejectingThreads size decrements, pool size increments
// in this very order:
final int nRejectingThreads = rejectingThreads.size();
final int currentNThreads = tpe.getPoolSize();
final int totalThreads = currentNThreads + nRejectingThreads;
// If we have too many threads, don't create new thread
if (totalThreads >= realMaximumPoolSize || totalThreads > activeOrScheduled) {
return super.offer(elem);
}
if (false) {
System.out.println("task: " + task + " " + "activeCount=" + activeCount
+ " scheduled=" + scheduled + " currentNThreads=" + currentNThreads
+ " nRejectingThreads=" + nRejectingThreads);
}
rejectingThreads.put(Thread.currentThread(), null);
}
if (false) {
try {
if (task.i2 == 0) {
try {
dbg.countDown();
dbg.await(1, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
} catch (final Exception e) {
e.printStackTrace();
}
}
return false;
}
private void reviseRejectingThreads() {
final Thread currentThread = Thread.currentThread();
final Iterator<Thread> it = rejectingThreads.keySet().iterator();
thread_loop: while (it.hasNext()) {
final Thread t = it.next();
if (t != currentThread) {
try {
for (final StackTraceElement st : t.getStackTrace()) {
if (TPE_CL_NAME.equals(st.getClassName())
&& "execute".equals(st.getMethodName())) {
continue thread_loop;
}
}
} catch (final SecurityException e) {
continue thread_loop;
}
}
it.remove();
}
}
// If the pool is almost full and two threads simultaneously reject
// a new queue element, the pool will grow twice and exceed
// realMaximumPoolSize.
// This map contains the threads that can still create their own
// workers.
private final WeakHashMap<Thread, Void> rejectingThreads = new WeakHashMap<Thread, Void>();
private static final String TPE_CL_NAME = ThreadPoolExecutor.class.getName();
private final CountDownLatch dbg = new CountDownLatch(5);
}
package org.foo.unfairqueue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A blocking queue with predictable selection of a thread when multiple threads are waiting for an
* element
*/
public class UnfairQueue<E> extends ForwardingBlockingQueue<E> {
/**
* Elect a thread to consume one available element. This implementation elects the thread with
* lowest id.
*
* @param waitingThreadsById candidates. Always contains at least one thread
* @return thread id
*/
protected long electConsumer(
final SortedMap<? extends Long, ? extends Thread> waitingThreadsById) {
return waitingThreadsById.firstKey();
}
UnfairQueue(final BlockingQueue<E> deleg) {
super(deleg);
}
@Override
public E take() throws InterruptedException {
return poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
/**
* The first thread to call this method becomes a leader. Only a leader can poll the real queue
* with a timeout. Other threads wait for the leader to obtain an element and then each calls
* {@link #electConsumer(TreeMap)} to see whether the current thread is the one to return it.
* After that another thread becomes a leader.
*/
@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
boolean needAfterPoll = true;
final Thread currentThread = Thread.currentThread();
final long deadline = System.nanoTime() + unit.toNanos(timeout);
try {
E newElem = null;
long delay;
for (;;) {
synchronized (consumeLock) {
consumerThreads.put(currentThread.getId(), currentThread);
for (;;) {
if (leaderConsumer == null) {
leaderConsumer = currentThread;
}
if (lastElem == null && newElem != null) {
lastElem = newElem;
newElem = null;
consumeLock.notifyAll();
}
delay = deadline - System.nanoTime();
if (lastElem != null) {
final long elected = electConsumer(consumerThreads);
if (currentThread.getId() == elected
|| (leaderConsumer.getId() == elected && delay <= 0)) {
// if leader was polling the real queue, it could not
// be notified that it's a winner. And so one of the
// other threads has to steal the leader's element
final E res = lastElem;
lastElem = newElem;
afterPoll();
needAfterPoll = false;
return res;
}
// lost election. Wait for lastElem to become null,
// which is guaranteed to be very soon
consumeLock.wait();
continue;
}
// lastElem is null
// maybe first cycle or poll returned null
// leader should try at least once
if (leaderConsumer == currentThread || delay <= 0) {
// either a leader or cannot wait any longer
break; // poll outside of synchronized
}
// not leader and can wait for leader
consumeLock.wait(delay / 1000000L, (int) (delay % 1000000L));
}
}
newElem = super.poll(delay, TimeUnit.NANOSECONDS);
if (newElem == null) {
return null;
}
}
} finally {
if (needAfterPoll) {
synchronized (consumeLock) {
afterPoll();
}
}
}
}
private void afterPoll() {
final Thread currentThread = Thread.currentThread();
consumerThreads.remove(currentThread.getId());
if (leaderConsumer == currentThread) {
leaderConsumer = null;
}
consumeLock.notifyAll();
}
private E lastElem;
private final TreeMap<Long, Thread> consumerThreads = new TreeMap<Long, Thread>();
private final Object consumeLock = new Object();
private Thread leaderConsumer;
}
/*
* Behaves like a fixed thread pool with unbounded queue, but can grow.
* Behaves like a cached thread pool, but can have finite max pool size.
*/
package org.foo.unfairqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class UnfairQueueTest {
public static void main(final String[] args) throws Exception {
final ExecutorService submitter = mkSubmitter();
final ThreadPoolExecutor executor = mkExecutor();
// thread alive time is 10s
// load all threads with tasks at start and every 12s
// also submit one task each second
for (int i = 0;; i++) {
int j = 0;
do {
if (false && !mostThreadsUnused(i)) {
break;
}
final Task task = new Task();
task.i2 = i;
task.j2 = j;
submitter.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
executor.submit(task);
} catch (final Exception e) {
System.out.println("" + task + " " + e.toString());
System.exit(1);
}
return null;
}
});
if (task.i2 == 12 && task.j2 == 5) {
// return;
}
} while (mostThreadsUnused(i) && ++j < (NTHREADS * 2));
Thread.sleep(1000);
System.out.println();
}
}
private static ThreadPoolExecutor mkExecutor() {
/*
* Core pool size is set to 0, otherwise TPE will grow unnecessarily at low load. Until the
* number of workers reaches corePoolSize, TPE creates a new worker even though there's an
* idle worker. allowCoreThreadTimeOut() will allow the pool to shrink, but only until a
* next task is submitted.
*/
/*
* IdlePoolFillingQueue rejects new elements, but only if the pool is busy and can grow.
* This combines the features of LinkedBlockingQueue used in fixed thread pool and
* SynchronousQueue used in cached thread pool.
*/
/*
* When an element appears in UnfairQueue, it's returned neither to a random thread, nor to
* the longest waiting thread. It's returned to the thread with the lowest id. Therefore, at
* low load one thread is constantly busy while the other threads are idle. They are able to
* reach their max idle time and terminate.
*/
/*
* maximumPoolSize set to NTHREADS, because we trust the IdlePoolFillingQueue to regulate
* the number of workers. If IdlePoolFillingQueue rejects an element by mistake, TPE will
* try to grow beyond maximumPoolSize and will reject the task. If we can afford the pool to
* overgrow in rare cases, we can set maximumPoolSize to Integer.MAX_VALUE and remove some
* hacks from IdlePoolFillingQueue to make it faster, but less precise.
*/
final IdlePoolFillingQueue rejecter = new IdlePoolFillingQueue(NTHREADS);
final BlockingQueue<Runnable> workQueue = new UnfairQueue<Runnable>(rejecter);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, NTHREADS, ALIVE_TIME, TimeUnit.SECONDS, workQueue);
rejecter.setExecutor(executor);
return executor;
}
private static ExecutorService mkSubmitter() {
final ExecutorService submitter = Executors.newCachedThreadPool(new ThreadFactory() {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
{
final SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "zzz" + "-thread-";
}
@Override
public Thread newThread(final Runnable r) {
final Thread t =
new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
});
((ThreadPoolExecutor) submitter).setKeepAliveTime(2, TimeUnit.SECONDS);
return submitter;
}
private static boolean mostThreadsUnused(final int i) {
return i % (ALIVE_TIME + 2) == 0;
}
private static final int NTHREADS = 4;
private static final int ALIVE_TIME = 10;
}
class Task implements Callable<Void> {
public int i2, j2;
@Override
public Void call() throws Exception {
System.out.println("" + Thread.currentThread().getName() + " " + i2 + " " + j2);
Thread.sleep(300);
return null;
}
@Override
public String toString() {
return i2 + " " + j2;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment