Created
August 10, 2015 22:47
-
-
Save trane/2c40c0262ccd19171410 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.zipkin.collector | |
import com.twitter.conversions.time._ | |
import com.twitter.finagle.stats.InMemoryStatsReceiver | |
import com.twitter.util.{Await, Future, FuturePool} | |
import java.util.concurrent.CountDownLatch | |
import org.scalatest._ | |
/** | |
* Tests the BlockingItemQueue to make sure that it can store and consume elements even when adding | |
* more elements than what its initial capacity is | |
*/ | |
class BlockingItemQueueTest extends FunSuite { | |
val Item = () | |
val latch = new CountDownLatch(1) | |
def fill(queue: BlockingItemQueue[Unit, Unit], items: Int): Future[Boolean] = { | |
val results = (0 until items) map { _ => | |
queue.add(Item) transform { e => Future.value(e) } | |
} | |
Future.collect(results).map(_.forall(_.isReturn)) | |
} | |
test("Sleeps on a max queue and waits for the worker to drain") { | |
val timeout = 5 seconds | |
val stats:InMemoryStatsReceiver = new InMemoryStatsReceiver() | |
val queueSize = 10 | |
val queue = new BlockingItemQueue[Unit, Unit](queueSize, 1, fallingBehindWorker, 100.millis, | |
100.millis, stats) | |
// fill the queue | |
assert(Await.result(fill(queue, queueSize))) | |
// validate that the queue is at capacity | |
assert(stats.counter("queueFull").apply() >= 1) | |
// ensure that if we add to the queue and timeout, that we fail | |
assert(Try { Await.result(queue.add(Item), Duration(1, TimeUnit.MILLISECOND)) }.isFailure) | |
// we haven't processed a item yet | |
assert(stats.counter("success").apply() == 0) | |
// allow a Future item to be added once we unleash the workers | |
queue.add(Item) | |
// unleash the workers | |
latch.countDown() | |
Await.ready(queue.close()) | |
// ensure the queue is drained and that we have processed 11 items | |
assert(queue.size() == 0) | |
assert(stats.counter("successes").apply() == queueSize + 1) | |
} | |
def fallingBehindWorker(param: Unit): Future[Unit] = { | |
Future { latch.await(); param } | |
} | |
} |
Also on line 40, it blocks as well, not allowing execution of line 42
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Line: 34 queuefull doesn't get set, because the counter is not incremented unless you go over.
I tried adding queue.add to trigger the counter, but it blocks, because its a sync twitter future, not a FutureTask. It seems like it needs to get executed inside of an executor (maybe Future.unbounded..).