Created
March 1, 2017 04:37
-
-
Save robotman3000/88a34679e70cfa076564f6907f932ea8 to your computer and use it in GitHub Desktop.
A Synchronized Processing Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayDeque; | |
import java.util.Deque; | |
public abstract class SynchronizedProcessingQueue<T> { | |
private final ArrayDeque<T> queue; | |
private final Thread thread; | |
private boolean running = true; | |
public SynchronizedProcessingQueue(final String threadName) { | |
queue = new ArrayDeque<>(); | |
thread = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
while (!Thread.interrupted() && running) { | |
try { | |
boolean queEmpty = false; | |
synchronized (queue) { | |
queEmpty = queue.isEmpty(); | |
} | |
if (queEmpty) { | |
synchronized (queue) { | |
queue.wait(1000 * 60); | |
} | |
} else { | |
processQue(); | |
} | |
} | |
catch (InterruptedException e) {} | |
catch (Exception e) { | |
System.err.println("Synchronized Processing Queue: An unhandled exception has occured"); | |
e.printStackTrace(); | |
} | |
} | |
// This makes sure that all the queued objects get processed | |
processQue(); | |
System.out.println("Processing Que Exiting: " + thread.getName()); | |
} | |
}, threadName); | |
} | |
public synchronized void addToQue(T object, boolean first) { | |
if (!(Thread.currentThread().getId() == thread.getId())) { | |
if (object != null) { | |
synchronized (queue) { | |
if (first) { | |
queue.addFirst(object); | |
} else { | |
queue.addLast(object); | |
} | |
queue.notifyAll(); | |
} | |
} | |
} else { | |
throw new IllegalThreadStateException("The thread " + Thread.currentThread().getName() + " attempted modify it's processing que while processing the que"); | |
} | |
} | |
protected Deque<T> getQueue() { | |
return queue; | |
} | |
public abstract void processNextElement(T element); | |
/** | |
* Called by the queue's processing thread when the queue has items to be | |
* processed.</br> | |
* <b>Implementation Note:</b> All access to the queue object while this | |
* method is running must be synchronized on the queue | |
*/ | |
protected void processQue() { | |
T element = null; | |
synchronized (getQueue()) { | |
element = getQueue().poll(); | |
} | |
while (element != null) { | |
processNextElement(element); | |
synchronized (getQueue()) { | |
element = getQueue().poll(); | |
} | |
} | |
} | |
public synchronized void start() { | |
this.running = true; | |
thread.start(); | |
} | |
public synchronized void stop() { | |
this.running = false; | |
thread.interrupt(); | |
synchronized (queue) { | |
queue.notifyAll(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment