Created
August 31, 2015 22:20
-
-
Save dbousamra/82b4e2ba47467a6af267 to your computer and use it in GitHub Desktop.
MQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mq.ironmq | |
| import java.util | |
| import java.util.concurrent.{ExecutorService, Executors} | |
| import com.typesafe.scalalogging.LazyLogging | |
| import io.iron.ironmq.{Messages, Client} | |
| import mq._ | |
| import scala.util.Try | |
| import scalaz._ | |
| import Scalaz._ | |
| import scalaz.concurrent._ | |
| import scalaz.stream._ | |
| import scala.collection.JavaConverters._ | |
| import scalaz.stream.async.mutable.Queue | |
| import scala.concurrent.duration._ | |
| case class IronMqConsumer(client: Client, queueName: QueueName, preFetch: Int = 2000)(implicit val executor: ExecutorService) extends Consumer with LazyLogging { | |
| type MessageContainer = IronMqMessage | |
| private val MAX_BATCH_SIZE = 100 | |
| private val queue = client.queue(queueName.value) | |
| def reserve: Seq[MessageContainer] = { | |
| queue.reserve(MAX_BATCH_SIZE).getMessages.map(IronMqMessage).toList | |
| } | |
| def ack(messages: Seq[MessageContainer]): Unit = { | |
| queue.deleteMessages(new Messages(new util.ArrayList(messages.map(_.value).toList.asJava))) | |
| } | |
| def count: Int = { | |
| queue.getInfoAboutQueue.getSize | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mq | |
| import java.util.concurrent.{Executors, ExecutorService} | |
| import com.typesafe.scalalogging.LazyLogging | |
| import scalaz._ | |
| import scalaz.concurrent.Task | |
| import scalaz.stream._ | |
| import scala.concurrent.duration._ | |
| case class QueueName(value: String) | |
| trait Message { | |
| def asPayload: String | |
| } | |
| sealed trait AckResponse | |
| case class Ack() extends AckResponse | |
| case class Nack() extends AckResponse | |
| case class Requeue() extends AckResponse | |
| trait Publisher { | |
| /* | |
| * Queues messages | |
| */ | |
| def send(messages: Seq[Message]): Unit | |
| } | |
| trait Consumer extends LazyLogging { | |
| implicit val executor: ExecutorService | |
| /* | |
| * Represents our message context. Generally some sort of MessageID and the Message itself. | |
| */ | |
| type MessageContainer | |
| /* | |
| * Acknowledges a message. | |
| */ | |
| def ack(messageContainer: Seq[MessageContainer]): Unit | |
| /* | |
| * Reserves a message off the queue. | |
| */ | |
| def reserve: Seq[MessageContainer] | |
| /* | |
| * A stream of MessageContainer's defined in terms of the reserve method. | |
| */ | |
| def reserveStream: Process[Task, MessageContainer] = { | |
| Process.repeatEval { | |
| Task { | |
| val reserved = this.reserve | |
| logger.debug(s"Reserved " + reserved.size + " messages") | |
| reserved | |
| } | |
| }.flatMap(messages => Process.emitAll(messages)) | |
| } | |
| /* | |
| * A stream of acknowledgement side effects (streams of type Unit are called Sinks) | |
| * This sink takes in a Seq of MessageContainers and acknowledges them in batch | |
| */ | |
| def ackSink: Sink[Task, Seq[MessageContainer]] = { | |
| sink.lift { messages: Seq[MessageContainer] => | |
| Task { | |
| this.ack(messages) | |
| logger.debug(s"Acked ${messages.length} messages") | |
| } | |
| } | |
| } | |
| /* | |
| * Consume messages from the queue. Takes in a work function that must respond with an AckResponse. | |
| * Operates in parallel and deals with backpressure | |
| */ | |
| def consume(work: (MessageContainer) => AckResponse) = { | |
| val reserveQueue = async.boundedQueue[MessageContainer](1000) | |
| val ackQueue = async.boundedQueue[MessageContainer](1000) | |
| val reserveEnqueueProcess = reserveStream.to(reserveQueue.enqueue) | |
| val processed = reserveQueue.dequeue.evalMap { message => | |
| Task.delay { | |
| logger.debug("In process") | |
| (work(message), message) | |
| } | |
| } | |
| val toAck = processed.collect { case (Ack(), message) => message } | |
| val ackEnqueueProcess = toAck.to(ackQueue.enqueue) | |
| val ackDequeueProcess = ackQueue.dequeueBatch(100).to(ackSink) | |
| val mergedProcess = (reserveEnqueueProcess) merge (ackEnqueueProcess merge ackDequeueProcess) | |
| val parallelMerged = scalaz.stream.merge.mergeN(4)(Process.constant(mergedProcess)) | |
| parallelMerged.run.run | |
| } | |
| } | |
| trait Mq { | |
| def getPublisher(queueName: QueueName)(implicit ec: ExecutorService): Publisher | |
| def getConsumer(queueName: QueueName)(implicit ec: ExecutorService): Consumer | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mq.rabbitmq | |
| import java.util.concurrent.ExecutorService | |
| import com.rabbitmq.client.{QueueingConsumer, Connection} | |
| import com.typesafe.scalalogging.LazyLogging | |
| import mq.{QueueName, AckResponse, Consumer} | |
| import rabbitmq.RabbitMqMessage | |
| case class RabbitMqConsumer(conn: Connection, queueName: QueueName, preFetch: Int = 2000)(implicit val executor: ExecutorService) extends Consumer with LazyLogging { | |
| type MessageContainer = (RabbitMqMessage, Long) | |
| val channel = newChannel(queueName) | |
| val consumer = new QueueingConsumer(channel) | |
| channel.basicConsume(queueName.value, false, consumer) | |
| private def newChannel(queueName: QueueName) = { | |
| val channel = conn.createChannel() | |
| channel.basicQos(preFetch, true) | |
| channel.queueDeclare(queueName.value, true, false, false, null) | |
| channel.confirmSelect() // publisher acks | |
| channel | |
| } | |
| def ack(messages: Seq[MessageContainer]): Unit = { | |
| messages.map { message => | |
| val (_, id) = message | |
| channel.basicAck(id, false) | |
| } | |
| } | |
| override def reserve: List[MessageContainer] = { | |
| Option(consumer.nextDelivery(1000)).map { message => | |
| (RabbitMqMessage(new String(message.getBody)), message.getEnvelope.getDeliveryTag) | |
| }.toList | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment