Created
September 25, 2011 23:32
-
-
Save dougnukem/1241317 to your computer and use it in GitHub Desktop.
Example Threadsafe BlockingQueue implementation in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BlockingQueue implements Queue { | |
private java.util.Queue queue = new java.util.LinkedList(); | |
/** | |
* Make a blocking Dequeue call so that we'll only return when the queue has | |
* something on it, otherwise we'll wait until something is put on it. | |
* | |
* @returns This will return null if the thread wait() call is interrupted. | |
*/ | |
public synchronized Object dequeue() { | |
Object msg = null; | |
while (queue.isEmpty()) { | |
try { | |
wait(); | |
} catch (InterruptedException e) { | |
// Error return the client a null item | |
return msg; | |
} | |
} | |
msg = queue.remove(); | |
return msg; | |
} | |
/** | |
* Enqueue will add an object to this queue, and will notify any waiting | |
* threads that there is an object available. | |
*/ | |
public synchronized void enqueue(Object o) { | |
queue.add(o); | |
// Wake up anyone waiting for something to be put on the queue. | |
notifyAll(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Consumer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void process(Object msg) { | |
try { | |
//process message non-trivially (IE: it takes awhile). | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
public void run() { | |
while(true) { | |
doStuff(); | |
} | |
} | |
public void doStuff() { | |
Object msg = queue.dequeue(); | |
process(msg); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Producer implements Runnable { | |
// This will be assigned in the constructor | |
private Queue queue = null; | |
public void run() { | |
// Binds to socket, reads messages in | |
// packages message calls doSomething() | |
// doSomething(Object msg); | |
} | |
public void doSomething(Object msg) { | |
queue.enqueue(msg); | |
} | |
} |
@chrislzm its depends on the situation notifyAll will cause the thread contentation eg if a lot of threads read or get the data from the queue and few of threads writing the data to the queue
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@shankyty No, unfortunately that won't work, because it can result in the lost wakeup problem. Check out this stack overflow article: https://stackoverflow.com/a/3186336/7602403
One solution is to replace your calls to lock.notify() with lock.notifyAll()