Skip to content

Instantly share code, notes, and snippets.

@pathikrit
Last active October 28, 2018 05:06
Show Gist options
  • Save pathikrit/1433ad98d51745342de6e9d787624a0d to your computer and use it in GitHub Desktop.
Save pathikrit/1433ad98d51745342de6e9d787624a0d to your computer and use it in GitHub Desktop.
Single synchronous Producer/Consumer in Scala
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent.{ExecutionContext, Future}
/**
* Rick's implementation of ghetto back-pressure algo
* Implement this trait and pass it off to ProducerConsumer.Runner to run it
*
* @tparam R Type of result to be crunched
* @tparam S State to iterate on
*/
trait ProducerConsumer[R, S] {
/**
* Return next (result, state); None when done producing
*/
def produce(state: S): Option[(R, S)]
/**
* Consume current state
*/
def consume(a: R): Unit
}
object ProducerConsumer {
def await(fs: Future[Unit]*)(implicit ec: ExecutionContext): Future[Unit] = Future.reduce(fs){case _ =>}
/**
* Use this to actually run a task
* @param maxQueueLength maximum amount of items kept in memory
*/
class Runner[A, S](task: ProducerConsumer[A, S], maxQueueLength: Int) {
private[this] val queue = new ArrayBlockingQueue[Option[(A, S)]](maxQueueLength)
def run(start: S)(implicit ec: ExecutionContext): Future[Unit] = {
val f1 = Future {
Iterator.iterate(task.produce(start))(_.flatMap({case (_, s) => task.produce(s)}))
.takeWhile(_.isDefined)
.foreach(queue.put)
}
val f2 = Future {
Iterator.continually(queue.take())
.takeWhile(_.isDefined)
.flatten
.foreach({case (res, _) => task.consume(res)})
}
await(f1, f2)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment