Last active
October 27, 2021 21:36
-
-
Save mnadeem/5d01282ea4f86201ea407065e9d53cf3 to your computer and use it in GitHub Desktop.
Scalable Thread Pool Executor (TPE) Which first creates threads up to max pool size and then queue up the tasks (Queue Does not depends upon Executor)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Collection; | |
import java.util.Iterator; | |
import java.util.concurrent.LinkedTransferQueue; | |
import java.util.concurrent.RejectedExecutionException; | |
import java.util.concurrent.RejectedExecutionHandler; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TransferQueue; | |
public final class ScalingThreadPoolExecutor extends ThreadPoolExecutor { | |
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveUnit, new DynamicBlockingQueue<Runnable>(new LinkedTransferQueue<Runnable>()), new ForceQueuePolicy()); | |
} | |
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit keepAliveUnit, TransferQueue<Runnable> workQueue) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveUnit, new DynamicBlockingQueue<Runnable>(workQueue), new ForceQueuePolicy()); | |
} | |
@Override | |
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { | |
throw new IllegalArgumentException("Cant set rejection handler"); | |
} | |
private static class ForceQueuePolicy implements RejectedExecutionHandler { | |
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { | |
try { | |
//Rejected work add to Queue. | |
executor.getQueue().put(r); | |
} catch (InterruptedException e) { | |
//should never happen since we never wait | |
throw new RejectedExecutionException(e); | |
} | |
} | |
} | |
private static class DynamicBlockingQueue<E> implements TransferQueue<E> { | |
private final TransferQueue<E> delegate; | |
public DynamicBlockingQueue(final TransferQueue<E> delegate) { | |
this.delegate = delegate; | |
} | |
@Override | |
public boolean offer(E o) { | |
return tryTransfer(o); | |
} | |
@Override | |
public boolean add(E o) { | |
if (this.delegate.add(o)) { | |
return true; | |
} else {// Not possible in our case | |
throw new IllegalStateException("Queue full"); | |
} | |
} | |
@Override | |
public E remove() { | |
return this.delegate.remove(); | |
} | |
@Override | |
public E poll() { | |
return this.delegate.poll(); | |
} | |
@Override | |
public E element() { | |
return this.delegate.element(); | |
} | |
@Override | |
public E peek() { | |
return this.delegate.peek(); | |
} | |
@Override | |
public int size() { | |
return this.delegate.size(); | |
} | |
@Override | |
public boolean isEmpty() { | |
return this.delegate.isEmpty(); | |
} | |
@Override | |
public Iterator<E> iterator() { | |
return this.delegate.iterator(); | |
} | |
@Override | |
public Object[] toArray() { | |
return this.delegate.toArray(); | |
} | |
@Override | |
public <T> T[] toArray(T[] a) { | |
return this.delegate.toArray(a); | |
} | |
@Override | |
public boolean containsAll(Collection<?> c) { | |
return this.delegate.containsAll(c); | |
} | |
@Override | |
public boolean addAll(Collection<? extends E> c) { | |
return this.delegate.addAll(c); | |
} | |
@Override | |
public boolean removeAll(Collection<?> c) { | |
return this.delegate.removeAll(c); | |
} | |
@Override | |
public boolean retainAll(Collection<?> c) { | |
return this.delegate.retainAll(c); | |
} | |
@Override | |
public void clear() { | |
this.delegate.clear(); | |
} | |
@Override | |
public void put(E e) throws InterruptedException { | |
this.delegate.put(e); | |
} | |
@Override | |
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { | |
return this.delegate.offer(e, timeout, unit); | |
} | |
@Override | |
public E take() throws InterruptedException { | |
return this.delegate.take(); | |
} | |
@Override | |
public E poll(long timeout, TimeUnit unit) throws InterruptedException { | |
return this.delegate.poll(timeout, unit); | |
} | |
@Override | |
public int remainingCapacity() { | |
return this.delegate.remainingCapacity(); | |
} | |
@Override | |
public boolean remove(Object o) { | |
return this.delegate.remove(o); | |
} | |
@Override | |
public boolean contains(Object o) { | |
return this.delegate.contains(o); | |
} | |
@Override | |
public int drainTo(Collection<? super E> c) { | |
return this.delegate.drainTo(c); | |
} | |
@Override | |
public int drainTo(Collection<? super E> c, int maxElements) { | |
return this.delegate.drainTo(c, maxElements); | |
} | |
@Override | |
public boolean tryTransfer(E e) { | |
return this.delegate.tryTransfer(e); | |
} | |
@Override | |
public void transfer(E e) throws InterruptedException { | |
this.delegate.transfer(e); | |
} | |
@Override | |
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { | |
return this.delegate.tryTransfer(e, timeout, unit); | |
} | |
@Override | |
public boolean hasWaitingConsumer() { | |
return this.delegate.hasWaitingConsumer(); | |
} | |
@Override | |
public int getWaitingConsumerCount() { | |
return this.delegate.getWaitingConsumerCount(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment