Created
September 25, 2011 23:32
-
-
Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
Example Threadsafe BlockingQueue implementation in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BlockingQueue implements Queue { | |
private java.util.Queue queue = new java.util.LinkedList(); | |
/** | |
* Make a blocking Dequeue call so that we'll only return when the queue has | |
* something on it, otherwise we'll wait until something is put on it. | |
* | |
* @returns This will return null if the thread wait() call is interrupted. | |
*/ | |
public synchronized Object dequeue() { | |
Object msg = null; | |
while (queue.isEmpty()) { | |
try { | |
wait(); | |
} catch (InterruptedException e) { | |
// Error return the client a null item | |
return msg; | |
} | |
} | |
msg = queue.remove(); | |
return msg; | |
} | |
/** | |
* Enqueue will add an object to this queue, and will notify any waiting | |
* threads that there is an object available. | |
*/ | |
public synchronized void enqueue(Object o) { | |
queue.add(o); | |
// Wake up anyone waiting for something to be put on the queue. | |
notifyAll(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Consumer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void process(Object msg) { | |
try { | |
//process message non-trivially (IE: it takes awhile). | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
public void run() { | |
while(true) { | |
doStuff(); | |
} | |
} | |
public void doStuff() { | |
Object msg = queue.dequeue(); | |
process(msg); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Producer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void run() { | |
// Binds to socket, reads messages in | |
// packages message calls doSomething() | |
// doSomething(Object msg); | |
} | |
public void doSomething(Object msg) { | |
queue.enqueue(msg); | |
} | |
} |
public class BoundedList<T> {
private final int capcity;
private final Queue<T> list;
private final Object lock = new Object();
public BoundedList(int capcity) {
this.capcity = capcity;
this.list = new ArrayDeque<>(capcity);
}
T poll() throws InterruptedException {
synchronized (lock) {
while (list.isEmpty()) {
lock.wait();
}
T e = list.poll();
lock.notify();
return e;
}
}
void add(T e) throws InterruptedException {
synchronized (lock) {
while (list.size() == capcity) {
lock.wait();
}
list.add(e);
lock.notify();
}
}
}
will this work
@shankyty No, unfortunately that won't work, because it can result in the lost wakeup problem. Check out this stack overflow article: https://stackoverflow.com/a/3186336/7602403
One solution is to replace your calls to lock.notify() with lock.notifyAll()
@chrislzm its depends on the situation notifyAll will cause the thread contentation eg if a lot of threads read or get the data from the queue and few of threads writing the data to the queue
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@vikranthpatoju is right. In enqueue method it is enough to do notify instead of notifyAll. The while(isEmpty) is not safe, since if there are 2 consumers simultaneously read the same element in the queue, you will have a race condition.