Skip to content

Instantly share code, notes, and snippets.

@dbousamra
Last active August 28, 2015 01:51
Show Gist options
  • Select an option

  • Save dbousamra/a873c43282ff363e5f47 to your computer and use it in GitHub Desktop.

Select an option

Save dbousamra/a873c43282ff363e5f47 to your computer and use it in GitHub Desktop.
package mq.ironmq
import java.util
import java.util.concurrent.{ExecutorService, Executors}
import com.typesafe.scalalogging.LazyLogging
import io.iron.ironmq.{Client, Messages}
import mq._
import scalaz._
import Scalaz._
import scalaz.concurrent._
import scalaz.stream._
import scala.collection.JavaConverters._
import mq.concurrency.Extras._
import scalaz.stream.async.mutable.Queue
import scala.concurrent.duration._
case class IronMqConsumer(client: Client, queueName: QueueName, preFetch: Int = 2000)(implicit val ec: ExecutorService) extends Consumer with LazyLogging {
val MAX_BATCH_SIZE = 100
val INTERNAL_QUEUE_SIZE = preFetch
val queue = client.queue(queueName.value)
private def log[A]: Sink[Task, A] = {
Process.constant { a: A =>
Task.now(logger.debug(a.toString))
}
}
private def reserve(count: Int): Process[Task, IronMqMessageContainer] = {
val messages = Process.repeatEval {
Task { queue.reserve(count).getMessages.map(message => IronMqMessageContainer(IronMqMessage(message))) }
}
messages.flatMap(messages => Process.emitAll(messages))
}
private def ack: Sink[Task, Vector[IronMqMessageContainer]] = {
Process.repeatEval {
Task { messages: Vector[IronMqMessageContainer] =>
Task {
val ironMessages = new Messages(new util.ArrayList(messages.map(_.message.value).toList.asJava))
queue.deleteMessages(ironMessages)
logger.debug(s"Acked ${ironMessages.getSize} messages")
}
}
}
}
def consume(f: MessageContainer => AckResponse) = {
/*
* Internal queue used to buffer messages from IronMQ
*/
val queue: Queue[IronMqMessageContainer] = async.boundedQueue[IronMqMessageContainer](INTERNAL_QUEUE_SIZE)
/*
* Fetch messages from IronMQ in bulk, and push them into the queue.enqueue sink.
*/
val reserved = reserve(MAX_BATCH_SIZE).to(queue.enqueue)
val concReserved = scalaz.stream.merge.mergeN(10)(Process.constant(reserved))
/*
* Dequeue messages and create a stream of processed messages.
*/
val processed: Process[Task, (AckResponse, IronMqMessageContainer)] = queue.dequeue.evalMap { message => Task { (f(message), message) } }
/*
* The messages that were acked.
*/
val toAck = processed.collect { case (Ack(), message) => message }
/*
* Batch the processed messages into groups and pipe them into the ack sink.
*/
val acked = toAck.chunk(MAX_BATCH_SIZE).to(ack)
val concAcked = scalaz.stream.merge.mergeN(10)(Process.constant(acked))
val loggedAcks = queue.size.discrete.zip(time.every(500 milliseconds)).filter(_._2).map(_._1).to(log)
concReserved.run.runAsync(_ => ())
concAcked.run.runAsync(_ => ())
loggedAcks.run.run
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment