-
-
Save viktorklang/865809 to your computer and use it in GitHub Desktop.
| package akka.util | |
| import java.util.concurrent.locks.ReentrantLock | |
| import java.util.concurrent.atomic.AtomicInteger | |
| import java.util.concurrent.{ TimeUnit, BlockingQueue } | |
| import java.util.{ AbstractQueue, Queue, Collection, Iterator } | |
| class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { | |
| backing match { | |
| case null => throw new IllegalArgumentException("Backing Queue may not be null") | |
| case b: BlockingQueue[_] => | |
| require(maxCapacity > 0) | |
| require(b.size() == 0) | |
| require(b.remainingCapacity >= maxCapacity) | |
| case b: Queue[_] => | |
| require(b.size() == 0) | |
| require(maxCapacity > 0) | |
| } | |
| protected val lock = new ReentrantLock(true) | |
| private val notEmpty = lock.newCondition() | |
| private val notFull = lock.newCondition() | |
| def put(e: E): Unit = { //Blocks until not full | |
| if (e eq null) throw new NullPointerException | |
| lock.lock() | |
| try { | |
| while (backing.size() == maxCapacity) | |
| notFull.await() | |
| require(backing.offer(e)) | |
| notEmpty.signal() | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def take(): E = { //Blocks until not empty | |
| lock.lockInterruptibly() | |
| try { | |
| while (backing.size() == 0) | |
| notEmpty.await() | |
| val e = backing.poll() | |
| require(e ne null) | |
| notFull.signal() | |
| e | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false | |
| if (e eq null) throw new NullPointerException | |
| lock.lock() | |
| try { | |
| if (backing.size() == maxCapacity) false | |
| else { | |
| require(backing.offer(e)) //Should never fail | |
| notEmpty.signal() | |
| true | |
| } | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail | |
| if (e eq null) throw new NullPointerException | |
| var nanos = unit.toNanos(timeout) | |
| lock.lockInterruptibly() | |
| try { | |
| while(backing.size() == maxCapacity) { | |
| if (nanos <= 0) | |
| return false | |
| else | |
| nanos = notFull.awaitNanos(nanos) | |
| } | |
| require(backing.offer(e)) //Should never fail | |
| notEmpty.signal() | |
| true | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail | |
| var nanos = unit.toNanos(timeout) | |
| lock.lockInterruptibly() | |
| try { | |
| var result: E = null.asInstanceOf[E] | |
| var hasResult = false | |
| while(!hasResult) { | |
| hasResult = backing.poll() match { | |
| case null if nanos <= 0 => | |
| result = null.asInstanceOf[E] | |
| true | |
| case null => | |
| try { | |
| nanos = notEmpty.awaitNanos(nanos) | |
| } catch { | |
| case ie: InterruptedException => | |
| notEmpty.signal() | |
| throw ie | |
| } | |
| false | |
| case e => | |
| notFull.signal() | |
| result = e | |
| true | |
| } | |
| } | |
| result | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null | |
| lock.lock() | |
| try { | |
| backing.poll() match { | |
| case null => null.asInstanceOf[E] | |
| case e => | |
| notFull.signal() | |
| e | |
| } | |
| } finally { | |
| lock.unlock | |
| } | |
| } | |
| override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false | |
| if (e eq null) throw new NullPointerException | |
| lock.lock() | |
| try { | |
| if (backing remove e) { | |
| notFull.signal() | |
| true | |
| } else false | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| override def contains(e: AnyRef): Boolean = { | |
| if (e eq null) throw new NullPointerException | |
| lock.lock() | |
| try { | |
| backing contains e | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| override def clear(): Unit = { | |
| lock.lock() | |
| try { | |
| backing.clear | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def remainingCapacity(): Int = { | |
| lock.lock() | |
| try { | |
| maxCapacity - backing.size() | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def size(): Int = { | |
| lock.lock() | |
| try { | |
| backing.size() | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def peek(): E = { | |
| lock.lock() | |
| try { | |
| backing.peek() | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) | |
| def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { | |
| if (c eq null) throw new NullPointerException | |
| if (c eq this) throw new IllegalArgumentException | |
| if (maxElements <= 0) 0 | |
| else { | |
| lock.lock() | |
| try { | |
| var n = 0 | |
| var e: E = null.asInstanceOf[E] | |
| while(n < maxElements) { | |
| backing.poll() match { | |
| case null => return n | |
| case e => | |
| c add e | |
| n += 1 | |
| } | |
| } | |
| n | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| } | |
| override def containsAll(c: Collection[_]): Boolean = { | |
| lock.lock() | |
| try { | |
| backing containsAll c | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| override def removeAll(c: Collection[_]): Boolean = { | |
| lock.lock() | |
| try { | |
| if (backing.removeAll(c)) { | |
| val sz = backing.size() | |
| if (sz < maxCapacity) notFull.signal() | |
| if (sz > 0) notEmpty.signal() //FIXME needed? | |
| true | |
| } else false | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| override def retainAll(c: Collection[_]): Boolean = { | |
| lock.lock() | |
| try { | |
| if (backing.retainAll(c)) { | |
| val sz = backing.size() | |
| if (sz < maxCapacity) notFull.signal() //FIXME needed? | |
| if (sz > 0) notEmpty.signal() | |
| true | |
| } else false | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| def iterator(): Iterator[E] = { | |
| lock.lock | |
| try { | |
| val elements = backing.toArray | |
| new Iterator[E] { | |
| var at = 0 | |
| var last = -1 | |
| def hasNext(): Boolean = at < elements.length | |
| def next(): E = { | |
| if (at >= elements.length) throw new NoSuchElementException | |
| last = at | |
| at += 1 | |
| elements(last).asInstanceOf[E] | |
| } | |
| def remove(): Unit = { | |
| if (last < 0) throw new IllegalStateException | |
| val target = elements(last) | |
| last = -1 //To avoid 2 subsequent removes without a next in between | |
| lock.lock() | |
| try { | |
| val i = backing.iterator() | |
| while(i.hasNext) { | |
| if (i.next eq target) { | |
| i.remove() | |
| notFull.signal() | |
| return () | |
| } | |
| } | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| } | |
| } finally { | |
| lock.unlock | |
| } | |
| } | |
| override def toArray(): Array[AnyRef] = { | |
| lock.lock() | |
| try { | |
| backing.toArray | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| override def isEmpty(): Boolean = { | |
| lock.lock() | |
| try { | |
| backing.isEmpty() | |
| } finally { | |
| lock.unlock() | |
| } | |
| } | |
| //FIXME Implement toArray[T] => Array[T] | |
| } |
Hey Jed,
No closures allowed, will be used in a highly concurrent environment, so allocations when not needed must be avoided.
I've ran a few tests, but I'd really love for there to be a jsr166-test.jar so you could do something simple like:
class MyBlockingQueueImplTest extends AbstractBlockingQueueTest {
def createBounded(bounds: Int): BlockingQueue = new MyBlockingQueue(bounds)
def createUnbounded(): BlockingQueue = new MyBlockingQueue()
}
And violá!
Also, do you really want a general purpose implementation? There's a lot of work that could be avoided by for instance making the iterator not support element removal.
Hi Victor,
have you run tests against a closure using version or is this a general aversion? The JVM is particularly good at handling short lived objects after all.
BTW, my latest version has the following syntax:
protected def empty = isEmpty
protected def full = backing.size == maxCapacity
//Blocks until not full
override def put(e: E) {
notNull(e)
when(notFull) {
notEmpty.signalAndReturn(require(backing offer e))
}
}
override def take = when(notEmpty) {
notFull.signalAndReturn(notNull(backing.poll()))
}
//Tries to do it immediately, if fail return false
def offer(e: E): Boolean = {
notNull(e)
lock(notEmpty.signalIf(!full && backing.offer(e)))
}
//Tries to remove the head of the queue immediately, if fail, return null
override def poll(): E = lock(notFull.signalIfNotNull(() => backing.poll()))
Also, not a generic test but certainly adaptable:
Problem is that then you need to copy paste a lot of code, and sooner or later, that has some other deps, and then you need to pull in even more stuff.
I'd say that there's a market for a self-contained jsr166-test.jar :-)
Victor,
This is a bit large and hairy for a detailed review, particularly without inline comments. One thing I did notice is that ABQ implements a resignal if a condition.await() is interrupted. In theory this might prevent a missed signal, but then, LBQ doesn't implement this so I am not entirely sure if it is necessary.
I explored a way to make it a bit cleaner, basically extracting the bounds checking into a trait that allows cleaner calling syntax:
This is as far as I have gotten so far, I am still working out the timeout support: https://bitbucket.org/jwesleysmith/atlassian-util-scala/src/tip/src/main/scala/com/atlassian/util/scala/concurrent/Bounded.scala