Skip to content

Instantly share code, notes, and snippets.

@dbousamra
Created August 26, 2015 07:55
Show Gist options
  • Select an option

  • Save dbousamra/f0a2e11c734de41a1c50 to your computer and use it in GitHub Desktop.

Select an option

Save dbousamra/f0a2e11c734de41a1c50 to your computer and use it in GitHub Desktop.
object Extras {
implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = {
val actions = process.zipWith(f)((data, f) => f(data))
val nestedActions = actions.map(Process.eval)
scalaz.stream.merge.mergeN(concurrencyLevel)(nestedActions)
}
}
}
package mq.ironmq
import java.util
import java.util.concurrent.{ExecutorService, Executors}
import com.typesafe.scalalogging.LazyLogging
import io.iron.ironmq.{Client, Messages}
import mq._
import scalaz._
import Scalaz._
import scalaz.concurrent._
import scalaz.stream._
import scala.collection.JavaConverters._
import mq.concurrency.Extras._
case class IronMqConsumer(client: Client, queueName: QueueName)(implicit val ec: ExecutorService) extends Consumer with LazyLogging {
val MAX_BATCH_SIZE = 100
val queue = client.queue(queueName.value)
def consume(f: MessageContainer => AckResponse) = {
val reservedMessages = reserve(MAX_BATCH_SIZE)
val responses = reservedMessages.map { message => (f(message), message) }
val toAck = responses.collect { case (Ack(), message) => message }
val toNack = responses.collect { case (Nack(), message) => message }
val toRequeue = responses.collect { case (Requeue(), message) => message }
val acked = toAck.chunk(MAX_BATCH_SIZE).concurrently(4)(ack)
val nacked = toNack.chunk(MAX_BATCH_SIZE).concurrently(4)(nack)
(acked ++ nacked).run.run
}
private def reserve(count: Int) = {
Process.repeatEval {
Task { queue.reserve(count).getMessages.map(message => IronMqMessageContainer(IronMqMessage(message))) }
}.flatMap(messages => Process.emitAll(messages))
}
private def ack: Sink[Task, Vector[IronMqMessageContainer]] = Process.repeatEval {
Task { messages: Vector[IronMqMessageContainer] =>
Task {
val ironMessages = new Messages(new util.ArrayList(messages.map(_.message.value).toList.asJava))
queue.deleteMessages(ironMessages)
logger.debug(s"Acked ${ironMessages.getSize} messages")
}
}
}
private def nack: Sink[Task, Vector[IronMqMessageContainer]] = Process.repeatEval {
Task { messages: Vector[IronMqMessageContainer] =>
Task {
logger.debug(s"Nacked ${messages.length} messages")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment