Created
March 11, 2011 12:07
-
-
Save viktorklang/865809 to your computer and use it in GitHub Desktop.
Just a draft
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.util | |
import java.util.concurrent.locks.ReentrantLock | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{ TimeUnit, BlockingQueue } | |
import java.util.{ AbstractQueue, Queue, Collection, Iterator } | |
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { | |
backing match { | |
case null => throw new IllegalArgumentException("Backing Queue may not be null") | |
case b: BlockingQueue[_] => | |
require(maxCapacity > 0) | |
require(b.size() == 0) | |
require(b.remainingCapacity >= maxCapacity) | |
case b: Queue[_] => | |
require(b.size() == 0) | |
require(maxCapacity > 0) | |
} | |
protected val lock = new ReentrantLock(true) | |
private val notEmpty = lock.newCondition() | |
private val notFull = lock.newCondition() | |
def put(e: E): Unit = { //Blocks until not full | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
while (backing.size() == maxCapacity) | |
notFull.await() | |
require(backing.offer(e)) | |
notEmpty.signal() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def take(): E = { //Blocks until not empty | |
lock.lockInterruptibly() | |
try { | |
while (backing.size() == 0) | |
notEmpty.await() | |
val e = backing.poll() | |
require(e ne null) | |
notFull.signal() | |
e | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing.size() == maxCapacity) false | |
else { | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail | |
if (e eq null) throw new NullPointerException | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
while(backing.size() == maxCapacity) { | |
if (nanos <= 0) | |
return false | |
else | |
nanos = notFull.awaitNanos(nanos) | |
} | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
var result: E = null.asInstanceOf[E] | |
var hasResult = false | |
while(!hasResult) { | |
hasResult = backing.poll() match { | |
case null if nanos <= 0 => | |
result = null.asInstanceOf[E] | |
true | |
case null => | |
try { | |
nanos = notEmpty.awaitNanos(nanos) | |
} catch { | |
case ie: InterruptedException => | |
notEmpty.signal() | |
throw ie | |
} | |
false | |
case e => | |
notFull.signal() | |
result = e | |
true | |
} | |
} | |
result | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null | |
lock.lock() | |
try { | |
backing.poll() match { | |
case null => null.asInstanceOf[E] | |
case e => | |
notFull.signal() | |
e | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing remove e) { | |
notFull.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def contains(e: AnyRef): Boolean = { | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
backing contains e | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def clear(): Unit = { | |
lock.lock() | |
try { | |
backing.clear | |
} finally { | |
lock.unlock() | |
} | |
} | |
def remainingCapacity(): Int = { | |
lock.lock() | |
try { | |
maxCapacity - backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def size(): Int = { | |
lock.lock() | |
try { | |
backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def peek(): E = { | |
lock.lock() | |
try { | |
backing.peek() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) | |
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { | |
if (c eq null) throw new NullPointerException | |
if (c eq this) throw new IllegalArgumentException | |
if (maxElements <= 0) 0 | |
else { | |
lock.lock() | |
try { | |
var n = 0 | |
var e: E = null.asInstanceOf[E] | |
while(n < maxElements) { | |
backing.poll() match { | |
case null => return n | |
case e => | |
c add e | |
n += 1 | |
} | |
} | |
n | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
override def containsAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
backing containsAll c | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def removeAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.removeAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() | |
if (sz > 0) notEmpty.signal() //FIXME needed? | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def retainAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.retainAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() //FIXME needed? | |
if (sz > 0) notEmpty.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
def iterator(): Iterator[E] = { | |
lock.lock | |
try { | |
val elements = backing.toArray | |
new Iterator[E] { | |
var at = 0 | |
var last = -1 | |
def hasNext(): Boolean = at < elements.length | |
def next(): E = { | |
if (at >= elements.length) throw new NoSuchElementException | |
last = at | |
at += 1 | |
elements(last).asInstanceOf[E] | |
} | |
def remove(): Unit = { | |
if (last < 0) throw new IllegalStateException | |
val target = elements(last) | |
last = -1 //To avoid 2 subsequent removes without a next in between | |
lock.lock() | |
try { | |
val i = backing.iterator() | |
while(i.hasNext) { | |
if (i.next eq target) { | |
i.remove() | |
notFull.signal() | |
return () | |
} | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def toArray(): Array[AnyRef] = { | |
lock.lock() | |
try { | |
backing.toArray | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def isEmpty(): Boolean = { | |
lock.lock() | |
try { | |
backing.isEmpty() | |
} finally { | |
lock.unlock() | |
} | |
} | |
//FIXME Implement toArray[T] => Array[T] | |
} |
Also, not a generic test but certainly adaptable:
Problem is that then you need to copy paste a lot of code, and sooner or later, that has some other deps, and then you need to pull in even more stuff.
I'd say that there's a market for a self-contained jsr166-test.jar :-)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Victor,
have you run tests against a closure using version or is this a general aversion? The JVM is particularly good at handling short lived objects after all.
BTW, my latest version has the following syntax: