Created
August 26, 2015 07:55
-
-
Save dbousamra/f0a2e11c734de41a1c50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object Extras { | |
| implicit class ConcurrentProcess[O](val process: Process[Task, O]) { | |
| def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = { | |
| val actions = process.zipWith(f)((data, f) => f(data)) | |
| val nestedActions = actions.map(Process.eval) | |
| scalaz.stream.merge.mergeN(concurrencyLevel)(nestedActions) | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mq.ironmq | |
| import java.util | |
| import java.util.concurrent.{ExecutorService, Executors} | |
| import com.typesafe.scalalogging.LazyLogging | |
| import io.iron.ironmq.{Client, Messages} | |
| import mq._ | |
| import scalaz._ | |
| import Scalaz._ | |
| import scalaz.concurrent._ | |
| import scalaz.stream._ | |
| import scala.collection.JavaConverters._ | |
| import mq.concurrency.Extras._ | |
| case class IronMqConsumer(client: Client, queueName: QueueName)(implicit val ec: ExecutorService) extends Consumer with LazyLogging { | |
| val MAX_BATCH_SIZE = 100 | |
| val queue = client.queue(queueName.value) | |
| def consume(f: MessageContainer => AckResponse) = { | |
| val reservedMessages = reserve(MAX_BATCH_SIZE) | |
| val responses = reservedMessages.map { message => (f(message), message) } | |
| val toAck = responses.collect { case (Ack(), message) => message } | |
| val toNack = responses.collect { case (Nack(), message) => message } | |
| val toRequeue = responses.collect { case (Requeue(), message) => message } | |
| val acked = toAck.chunk(MAX_BATCH_SIZE).concurrently(4)(ack) | |
| val nacked = toNack.chunk(MAX_BATCH_SIZE).concurrently(4)(nack) | |
| (acked ++ nacked).run.run | |
| } | |
| private def reserve(count: Int) = { | |
| Process.repeatEval { | |
| Task { queue.reserve(count).getMessages.map(message => IronMqMessageContainer(IronMqMessage(message))) } | |
| }.flatMap(messages => Process.emitAll(messages)) | |
| } | |
| private def ack: Sink[Task, Vector[IronMqMessageContainer]] = Process.repeatEval { | |
| Task { messages: Vector[IronMqMessageContainer] => | |
| Task { | |
| val ironMessages = new Messages(new util.ArrayList(messages.map(_.message.value).toList.asJava)) | |
| queue.deleteMessages(ironMessages) | |
| logger.debug(s"Acked ${ironMessages.getSize} messages") | |
| } | |
| } | |
| } | |
| private def nack: Sink[Task, Vector[IronMqMessageContainer]] = Process.repeatEval { | |
| Task { messages: Vector[IronMqMessageContainer] => | |
| Task { | |
| logger.debug(s"Nacked ${messages.length} messages") | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment