Skip to content

Instantly share code, notes, and snippets.

@dbousamra
Created August 31, 2015 22:20
Show Gist options
  • Select an option

  • Save dbousamra/82b4e2ba47467a6af267 to your computer and use it in GitHub Desktop.

Select an option

Save dbousamra/82b4e2ba47467a6af267 to your computer and use it in GitHub Desktop.
MQ
package mq.ironmq
import java.util
import java.util.concurrent.{ExecutorService, Executors}
import com.typesafe.scalalogging.LazyLogging
import io.iron.ironmq.{Messages, Client}
import mq._
import scala.util.Try
import scalaz._
import Scalaz._
import scalaz.concurrent._
import scalaz.stream._
import scala.collection.JavaConverters._
import scalaz.stream.async.mutable.Queue
import scala.concurrent.duration._
case class IronMqConsumer(client: Client, queueName: QueueName, preFetch: Int = 2000)(implicit val executor: ExecutorService) extends Consumer with LazyLogging {
type MessageContainer = IronMqMessage
private val MAX_BATCH_SIZE = 100
private val queue = client.queue(queueName.value)
def reserve: Seq[MessageContainer] = {
queue.reserve(MAX_BATCH_SIZE).getMessages.map(IronMqMessage).toList
}
def ack(messages: Seq[MessageContainer]): Unit = {
queue.deleteMessages(new Messages(new util.ArrayList(messages.map(_.value).toList.asJava)))
}
def count: Int = {
queue.getInfoAboutQueue.getSize
}
}
package mq
import java.util.concurrent.{Executors, ExecutorService}
import com.typesafe.scalalogging.LazyLogging
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.duration._
case class QueueName(value: String)
trait Message {
def asPayload: String
}
sealed trait AckResponse
case class Ack() extends AckResponse
case class Nack() extends AckResponse
case class Requeue() extends AckResponse
trait Publisher {
/*
* Queues messages
*/
def send(messages: Seq[Message]): Unit
}
trait Consumer extends LazyLogging {
implicit val executor: ExecutorService
/*
* Represents our message context. Generally some sort of MessageID and the Message itself.
*/
type MessageContainer
/*
* Acknowledges a message.
*/
def ack(messageContainer: Seq[MessageContainer]): Unit
/*
* Reserves a message off the queue.
*/
def reserve: Seq[MessageContainer]
/*
* A stream of MessageContainer's defined in terms of the reserve method.
*/
def reserveStream: Process[Task, MessageContainer] = {
Process.repeatEval {
Task {
val reserved = this.reserve
logger.debug(s"Reserved " + reserved.size + " messages")
reserved
}
}.flatMap(messages => Process.emitAll(messages))
}
/*
* A stream of acknowledgement side effects (streams of type Unit are called Sinks)
* This sink takes in a Seq of MessageContainers and acknowledges them in batch
*/
def ackSink: Sink[Task, Seq[MessageContainer]] = {
sink.lift { messages: Seq[MessageContainer] =>
Task {
this.ack(messages)
logger.debug(s"Acked ${messages.length} messages")
}
}
}
/*
* Consume messages from the queue. Takes in a work function that must respond with an AckResponse.
* Operates in parallel and deals with backpressure
*/
def consume(work: (MessageContainer) => AckResponse) = {
val reserveQueue = async.boundedQueue[MessageContainer](1000)
val ackQueue = async.boundedQueue[MessageContainer](1000)
val reserveEnqueueProcess = reserveStream.to(reserveQueue.enqueue)
val processed = reserveQueue.dequeue.evalMap { message =>
Task.delay {
logger.debug("In process")
(work(message), message)
}
}
val toAck = processed.collect { case (Ack(), message) => message }
val ackEnqueueProcess = toAck.to(ackQueue.enqueue)
val ackDequeueProcess = ackQueue.dequeueBatch(100).to(ackSink)
val mergedProcess = (reserveEnqueueProcess) merge (ackEnqueueProcess merge ackDequeueProcess)
val parallelMerged = scalaz.stream.merge.mergeN(4)(Process.constant(mergedProcess))
parallelMerged.run.run
}
}
trait Mq {
def getPublisher(queueName: QueueName)(implicit ec: ExecutorService): Publisher
def getConsumer(queueName: QueueName)(implicit ec: ExecutorService): Consumer
}
package mq.rabbitmq
import java.util.concurrent.ExecutorService
import com.rabbitmq.client.{QueueingConsumer, Connection}
import com.typesafe.scalalogging.LazyLogging
import mq.{QueueName, AckResponse, Consumer}
import rabbitmq.RabbitMqMessage
case class RabbitMqConsumer(conn: Connection, queueName: QueueName, preFetch: Int = 2000)(implicit val executor: ExecutorService) extends Consumer with LazyLogging {
type MessageContainer = (RabbitMqMessage, Long)
val channel = newChannel(queueName)
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queueName.value, false, consumer)
private def newChannel(queueName: QueueName) = {
val channel = conn.createChannel()
channel.basicQos(preFetch, true)
channel.queueDeclare(queueName.value, true, false, false, null)
channel.confirmSelect() // publisher acks
channel
}
def ack(messages: Seq[MessageContainer]): Unit = {
messages.map { message =>
val (_, id) = message
channel.basicAck(id, false)
}
}
override def reserve: List[MessageContainer] = {
Option(consumer.nextDelivery(1000)).map { message =>
(RabbitMqMessage(new String(message.getBody)), message.getEnvelope.getDeliveryTag)
}.toList
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment