Skip to content

Instantly share code, notes, and snippets.

@robotman3000
Created March 1, 2017 04:37
Show Gist options
  • Save robotman3000/88a34679e70cfa076564f6907f932ea8 to your computer and use it in GitHub Desktop.
Save robotman3000/88a34679e70cfa076564f6907f932ea8 to your computer and use it in GitHub Desktop.
A Synchronized Processing Queue
import java.util.ArrayDeque;
import java.util.Deque;
public abstract class SynchronizedProcessingQueue<T> {
private final ArrayDeque<T> queue;
private final Thread thread;
private boolean running = true;
public SynchronizedProcessingQueue(final String threadName) {
queue = new ArrayDeque<>();
thread = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.interrupted() && running) {
try {
boolean queEmpty = false;
synchronized (queue) {
queEmpty = queue.isEmpty();
}
if (queEmpty) {
synchronized (queue) {
queue.wait(1000 * 60);
}
} else {
processQue();
}
}
catch (InterruptedException e) {}
catch (Exception e) {
System.err.println("Synchronized Processing Queue: An unhandled exception has occured");
e.printStackTrace();
}
}
// This makes sure that all the queued objects get processed
processQue();
System.out.println("Processing Que Exiting: " + thread.getName());
}
}, threadName);
}
public synchronized void addToQue(T object, boolean first) {
if (!(Thread.currentThread().getId() == thread.getId())) {
if (object != null) {
synchronized (queue) {
if (first) {
queue.addFirst(object);
} else {
queue.addLast(object);
}
queue.notifyAll();
}
}
} else {
throw new IllegalThreadStateException("The thread " + Thread.currentThread().getName() + " attempted modify it's processing que while processing the que");
}
}
protected Deque<T> getQueue() {
return queue;
}
public abstract void processNextElement(T element);
/**
* Called by the queue's processing thread when the queue has items to be
* processed.</br>
* <b>Implementation Note:</b> All access to the queue object while this
* method is running must be synchronized on the queue
*/
protected void processQue() {
T element = null;
synchronized (getQueue()) {
element = getQueue().poll();
}
while (element != null) {
processNextElement(element);
synchronized (getQueue()) {
element = getQueue().poll();
}
}
}
public synchronized void start() {
this.running = true;
thread.start();
}
public synchronized void stop() {
this.running = false;
thread.interrupt();
synchronized (queue) {
queue.notifyAll();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment