-
-
Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
public class BlockingQueue implements Queue { | |
private java.util.Queue queue = new java.util.LinkedList(); | |
/** | |
* Make a blocking Dequeue call so that we'll only return when the queue has | |
* something on it, otherwise we'll wait until something is put on it. | |
* | |
* @returns This will return null if the thread wait() call is interrupted. | |
*/ | |
public synchronized Object dequeue() { | |
Object msg = null; | |
while (queue.isEmpty()) { | |
try { | |
wait(); | |
} catch (InterruptedException e) { | |
// Error return the client a null item | |
return msg; | |
} | |
} | |
msg = queue.remove(); | |
return msg; | |
} | |
/** | |
* Enqueue will add an object to this queue, and will notify any waiting | |
* threads that there is an object available. | |
*/ | |
public synchronized void enqueue(Object o) { | |
queue.add(o); | |
// Wake up anyone waiting for something to be put on the queue. | |
notifyAll(); | |
} | |
} |
public class Consumer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void process(Object msg) { | |
try { | |
//process message non-trivially (IE: it takes awhile). | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
public void run() { | |
while(true) { | |
doStuff(); | |
} | |
} | |
public void doStuff() { | |
Object msg = queue.dequeue(); | |
process(msg); | |
} | |
} |
public class Producer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void run() { | |
// Binds to socket, reads messages in | |
// packages message calls doSomething() | |
// doSomething(Object msg); | |
} | |
public void doSomething(Object msg) { | |
queue.enqueue(msg); | |
} | |
} |
does while loop is required?? having just if condition should serve the purpose rite??
@vikranthpatoju is right. In enqueue method it is enough to do notify instead of notifyAll. The while(isEmpty) is not safe, since if there are 2 consumers simultaneously read the same element in the queue, you will have a race condition.
public class BoundedList<T> {
private final int capcity;
private final Queue<T> list;
private final Object lock = new Object();
public BoundedList(int capcity) {
this.capcity = capcity;
this.list = new ArrayDeque<>(capcity);
}
T poll() throws InterruptedException {
synchronized (lock) {
while (list.isEmpty()) {
lock.wait();
}
T e = list.poll();
lock.notify();
return e;
}
}
void add(T e) throws InterruptedException {
synchronized (lock) {
while (list.size() == capcity) {
lock.wait();
}
list.add(e);
lock.notify();
}
}
}
will this work
@shankyty No, unfortunately that won't work, because it can result in the lost wakeup problem. Check out this stack overflow article: https://stackoverflow.com/a/3186336/7602403
One solution is to replace your calls to lock.notify() with lock.notifyAll()
@chrislzm its depends on the situation notifyAll will cause the thread contentation eg if a lot of threads read or get the data from the queue and few of threads writing the data to the queue
@jeremyshi is correct, you need a limit attribute in the class of BlockingQueue