Skip to content

Instantly share code, notes, and snippets.

@agilejon
Last active December 23, 2015 05:19
Show Gist options
  • Save agilejon/6586492 to your computer and use it in GitHub Desktop.
Save agilejon/6586492 to your computer and use it in GitHub Desktop.
private void OnAppStartup()
{
ConnectionFactory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "myapp",
Password = "password",
RequestedHeartbeat = 4, // heartbeat is what lets your code detect problems communicating with
// the RabbitMQ server. A heartbeat of 4 will cause the server to send
// a heartbeat 'ping' message every 4 seconds. If the client has not
// received a ping in 8 seconds (heartbeat*2) then it will consider the
// connection to be bad and throw a ConnectionShutdown event.
};
var SingletonConnection = ConnectionFactory.CreateConnection();
SingletonConnection.ConnectionShutdown += SingletonConnection_ConnectionShutdown; // handle disconnects
}
// else where in your code that actually talks to RabbitMQ
public class Publisher {
private IModel _model;
public Publisher(string queueName) {
_model = conn.CreateModel();
// it's safe to declare queues that already exist as long as their properties are identical
// this runs on initialization every time - that will ensure all of your queues exist on app startup
model.QueueDeclare(queueName, true, false, false, null);
// binds the given queue to the routingKey. We use the same string as both the queue name and routing key
model.QueueBind(queueName, "amq.direct", queueName);
}
// serialize your message body however you want. JSON is nice because it's makes debugging a bit easier.
public void PublishMessage(string routingKey, byte[] body)
{
var model = _model;
var props = model.CreateBasicProperties();
// this is important, it tells the queue to make the message delivery 'durable'
props.DeliveryMode = 2;
if (props.Headers == null)
{
props.Headers = new Dictionary<string, object>();
}
model.BasicPublish("amq.direct", routingKey, props, body);
}
}
public class Consumer {
private IModel _model;
private QueueingBasicConsumer _queue;
public Consumer(string queueName) {
_model = conn.CreateModel();
_model.BasicQos(0, 50, false); // setting a per-channel prefetch can help with your overall throughput. See
// http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
// for more info.
_queue = new QueueingBasicConsumer(model);
_model.BasicConsume(queueName, false, _queue);
}
// this is obviously simplified from what you'd want to do. I'll leave it up to you to decide how to integrate
public void ProcessMessage<T>(TimeSpan timeout, Action<T> handler) {
object result;
if (!_queue.Queue.Dequeue(timeout, out result)) {
return;
}
BasicDeliverEventArgs msg = (BasicDeliverEventArgs)result;
T deserialized = Deserialize<T>(msg.Body);
handler(deserialized);
// let RabbitMQ know that the message was processed.
// if handler() throws an exception the message will not be ACK'd and can be re-delivered to another consumer
_model.BasicAck(msg.DeliveryTag, false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment