Skip to content

Instantly share code, notes, and snippets.

@agilejon
Created November 12, 2013 04:53
Show Gist options
  • Save agilejon/7425663 to your computer and use it in GitHub Desktop.
Save agilejon/7425663 to your computer and use it in GitHub Desktop.
public class Consumer {
private IModel _model;
private QueueingBasicConsumer _queue;
public Consumer(IConnection conn, string queueName) {
_model = conn.CreateModel();
_model.BasicQos(0, 50, false); // setting a per-channel prefetch can help with your overall throughput. See
// http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
// for more info.
_queue = new QueueingBasicConsumer(model);
_model.BasicConsume(queueName, false, _queue);
}
// this is obviously simplified from what you'd want to do. I'll leave it up to you to decide how to integrate
public void ProcessMessage<T>(TimeSpan timeout, Action<T> handler) {
object result;
if (!_queue.Queue.Dequeue(timeout, out result)) {
return;
}
BasicDeliverEventArgs msg = (BasicDeliverEventArgs)result;
T deserialized = Deserialize<T>(msg.Body);
handler(deserialized);
// let RabbitMQ know that the message was processed.
// if handler() throws an exception the message will not be ACK'd and can be re-delivered to another consumer
_model.BasicAck(msg.DeliveryTag, false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment