Last active
August 28, 2015 01:51
-
-
Save dbousamra/a873c43282ff363e5f47 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mq.ironmq | |
| import java.util | |
| import java.util.concurrent.{ExecutorService, Executors} | |
| import com.typesafe.scalalogging.LazyLogging | |
| import io.iron.ironmq.{Client, Messages} | |
| import mq._ | |
| import scalaz._ | |
| import Scalaz._ | |
| import scalaz.concurrent._ | |
| import scalaz.stream._ | |
| import scala.collection.JavaConverters._ | |
| import mq.concurrency.Extras._ | |
| import scalaz.stream.async.mutable.Queue | |
| import scala.concurrent.duration._ | |
| case class IronMqConsumer(client: Client, queueName: QueueName, preFetch: Int = 2000)(implicit val ec: ExecutorService) extends Consumer with LazyLogging { | |
| val MAX_BATCH_SIZE = 100 | |
| val INTERNAL_QUEUE_SIZE = preFetch | |
| val queue = client.queue(queueName.value) | |
| private def log[A]: Sink[Task, A] = { | |
| Process.constant { a: A => | |
| Task.now(logger.debug(a.toString)) | |
| } | |
| } | |
| private def reserve(count: Int): Process[Task, IronMqMessageContainer] = { | |
| val messages = Process.repeatEval { | |
| Task { queue.reserve(count).getMessages.map(message => IronMqMessageContainer(IronMqMessage(message))) } | |
| } | |
| messages.flatMap(messages => Process.emitAll(messages)) | |
| } | |
| private def ack: Sink[Task, Vector[IronMqMessageContainer]] = { | |
| Process.repeatEval { | |
| Task { messages: Vector[IronMqMessageContainer] => | |
| Task { | |
| val ironMessages = new Messages(new util.ArrayList(messages.map(_.message.value).toList.asJava)) | |
| queue.deleteMessages(ironMessages) | |
| logger.debug(s"Acked ${ironMessages.getSize} messages") | |
| } | |
| } | |
| } | |
| } | |
| def consume(f: MessageContainer => AckResponse) = { | |
| /* | |
| * Internal queue used to buffer messages from IronMQ | |
| */ | |
| val queue: Queue[IronMqMessageContainer] = async.boundedQueue[IronMqMessageContainer](INTERNAL_QUEUE_SIZE) | |
| /* | |
| * Fetch messages from IronMQ in bulk, and push them into the queue.enqueue sink. | |
| */ | |
| val reserved = reserve(MAX_BATCH_SIZE).to(queue.enqueue) | |
| val concReserved = scalaz.stream.merge.mergeN(10)(Process.constant(reserved)) | |
| /* | |
| * Dequeue messages and create a stream of processed messages. | |
| */ | |
| val processed: Process[Task, (AckResponse, IronMqMessageContainer)] = queue.dequeue.evalMap { message => Task { (f(message), message) } } | |
| /* | |
| * The messages that were acked. | |
| */ | |
| val toAck = processed.collect { case (Ack(), message) => message } | |
| /* | |
| * Batch the processed messages into groups and pipe them into the ack sink. | |
| */ | |
| val acked = toAck.chunk(MAX_BATCH_SIZE).to(ack) | |
| val concAcked = scalaz.stream.merge.mergeN(10)(Process.constant(acked)) | |
| val loggedAcks = queue.size.discrete.zip(time.every(500 milliseconds)).filter(_._2).map(_._1).to(log) | |
| concReserved.run.runAsync(_ => ()) | |
| concAcked.run.runAsync(_ => ()) | |
| loggedAcks.run.run | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment