Created
March 11, 2011 12:07
-
-
Save viktorklang/865809 to your computer and use it in GitHub Desktop.
Just a draft
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.util | |
import java.util.concurrent.locks.ReentrantLock | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{ TimeUnit, BlockingQueue } | |
import java.util.{ AbstractQueue, Queue, Collection, Iterator } | |
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { | |
backing match { | |
case null => throw new IllegalArgumentException("Backing Queue may not be null") | |
case b: BlockingQueue[_] => | |
require(maxCapacity > 0) | |
require(b.size() == 0) | |
require(b.remainingCapacity >= maxCapacity) | |
case b: Queue[_] => | |
require(b.size() == 0) | |
require(maxCapacity > 0) | |
} | |
protected val lock = new ReentrantLock(true) | |
private val notEmpty = lock.newCondition() | |
private val notFull = lock.newCondition() | |
def put(e: E): Unit = { //Blocks until not full | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
while (backing.size() == maxCapacity) | |
notFull.await() | |
require(backing.offer(e)) | |
notEmpty.signal() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def take(): E = { //Blocks until not empty | |
lock.lockInterruptibly() | |
try { | |
while (backing.size() == 0) | |
notEmpty.await() | |
val e = backing.poll() | |
require(e ne null) | |
notFull.signal() | |
e | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing.size() == maxCapacity) false | |
else { | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail | |
if (e eq null) throw new NullPointerException | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
while(backing.size() == maxCapacity) { | |
if (nanos <= 0) | |
return false | |
else | |
nanos = notFull.awaitNanos(nanos) | |
} | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
var result: E = null.asInstanceOf[E] | |
var hasResult = false | |
while(!hasResult) { | |
hasResult = backing.poll() match { | |
case null if nanos <= 0 => | |
result = null.asInstanceOf[E] | |
true | |
case null => | |
try { | |
nanos = notEmpty.awaitNanos(nanos) | |
} catch { | |
case ie: InterruptedException => | |
notEmpty.signal() | |
throw ie | |
} | |
false | |
case e => | |
notFull.signal() | |
result = e | |
true | |
} | |
} | |
result | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null | |
lock.lock() | |
try { | |
backing.poll() match { | |
case null => null.asInstanceOf[E] | |
case e => | |
notFull.signal() | |
e | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing remove e) { | |
notFull.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def contains(e: AnyRef): Boolean = { | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
backing contains e | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def clear(): Unit = { | |
lock.lock() | |
try { | |
backing.clear | |
} finally { | |
lock.unlock() | |
} | |
} | |
def remainingCapacity(): Int = { | |
lock.lock() | |
try { | |
maxCapacity - backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def size(): Int = { | |
lock.lock() | |
try { | |
backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def peek(): E = { | |
lock.lock() | |
try { | |
backing.peek() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) | |
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { | |
if (c eq null) throw new NullPointerException | |
if (c eq this) throw new IllegalArgumentException | |
if (maxElements <= 0) 0 | |
else { | |
lock.lock() | |
try { | |
var n = 0 | |
var e: E = null.asInstanceOf[E] | |
while(n < maxElements) { | |
backing.poll() match { | |
case null => return n | |
case e => | |
c add e | |
n += 1 | |
} | |
} | |
n | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
override def containsAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
backing containsAll c | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def removeAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.removeAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() | |
if (sz > 0) notEmpty.signal() //FIXME needed? | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def retainAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.retainAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() //FIXME needed? | |
if (sz > 0) notEmpty.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
def iterator(): Iterator[E] = { | |
lock.lock | |
try { | |
val elements = backing.toArray | |
new Iterator[E] { | |
var at = 0 | |
var last = -1 | |
def hasNext(): Boolean = at < elements.length | |
def next(): E = { | |
if (at >= elements.length) throw new NoSuchElementException | |
last = at | |
at += 1 | |
elements(last).asInstanceOf[E] | |
} | |
def remove(): Unit = { | |
if (last < 0) throw new IllegalStateException | |
val target = elements(last) | |
last = -1 //To avoid 2 subsequent removes without a next in between | |
lock.lock() | |
try { | |
val i = backing.iterator() | |
while(i.hasNext) { | |
if (i.next eq target) { | |
i.remove() | |
notFull.signal() | |
return () | |
} | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def toArray(): Array[AnyRef] = { | |
lock.lock() | |
try { | |
backing.toArray | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def isEmpty(): Boolean = { | |
lock.lock() | |
try { | |
backing.isEmpty() | |
} finally { | |
lock.unlock() | |
} | |
} | |
//FIXME Implement toArray[T] => Array[T] | |
} |
Hi Victor,
have you run tests against a closure using version or is this a general aversion? The JVM is particularly good at handling short lived objects after all.
BTW, my latest version has the following syntax:
protected def empty = isEmpty
protected def full = backing.size == maxCapacity
//Blocks until not full
override def put(e: E) {
notNull(e)
when(notFull) {
notEmpty.signalAndReturn(require(backing offer e))
}
}
override def take = when(notEmpty) {
notFull.signalAndReturn(notNull(backing.poll()))
}
//Tries to do it immediately, if fail return false
def offer(e: E): Boolean = {
notNull(e)
lock(notEmpty.signalIf(!full && backing.offer(e)))
}
//Tries to remove the head of the queue immediately, if fail, return null
override def poll(): E = lock(notFull.signalIfNotNull(() => backing.poll()))
Also, not a generic test but certainly adaptable:
Problem is that then you need to copy paste a lot of code, and sooner or later, that has some other deps, and then you need to pull in even more stuff.
I'd say that there's a market for a self-contained jsr166-test.jar :-)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Also, do you really want a general purpose implementation? There's a lot of work that could be avoided by for instance making the iterator not support element removal.