Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jaked/886105 to your computer and use it in GitHub Desktop.
Save jaked/886105 to your computer and use it in GitHub Desktop.
import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit}
/*
Abstracts away the common pattern of producing items into a queue that are
consumed concurrently by a pool of workers.
*/
class ConcurrentBlockingQueueConsumer[T](queue: BlockingQueue[T], producer: Iterator[T], concurrencyLevel: Int) {
lazy val pool = Executors.newFixedThreadPool(concurrencyLevel)
def run()(consumer: (T) => Unit) {
val producerThread = ProducerThread(producer)
producerThread.start()
(1 to concurrencyLevel).foreach { _ =>
pool.submit(Consumer(consumer))
}
producerThread.join()
stopLatch.await()
pool.shutdown()
pool.awaitTermination(500, TimeUnit.MILLISECONDS)
}
private def ProducerThread(producer: Iterator[T]) = {
new Thread(new Runnable {
def run() {
for (item <- producer) {
queue.put(Some(item))
}
for (_ <- 1 to concurrencyLevel) {
queue.put(None)
}
}
})
}
private def Consumer(consumer: (T) => Unit) = {
new Runnable {
def run(): Unit {
queue.take() match {
case Some(t) => consumer(t); run()
case None => ()
}
}
}
}
}
object ConcurrentBlockingQueueConsumer {
def apply[T](queue: ArrayBlockingQueue[T], producer: Iterator[T])(consumer: (T) => Unit) {
val concurrencyLevel = queue.size + queue.remainingCapacity()
ConcurrentBlockingQueueConsumer(queue, producer, concurrencyLevel)(consumer)
}
def apply[T](queue: BlockingQueue[T], producer: Iterator[T], concurrencyLevel: Int)(consumer: (T) => Unit) {
new ConcurrentBlockingQueueConsumer(queue, producer, concurrencyLevel).run()(consumer)
}
}
object ConcurrentBlockingQueueConsumerExample {
def main(args: Array[String]) {
val queue = new ArrayBlockingQueue[String](4)
val producer = io.Source.fromInputStream(System.in).getLines
ConcurrentBlockingQueueConsumer[String](queue, producer) { item =>
println(item.stripLineEnd)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment