Skip to content

Instantly share code, notes, and snippets.

@acken
Created September 27, 2011 17:17
Show Gist options
  • Select an option

  • Save acken/1245668 to your computer and use it in GitHub Desktop.

Select an option

Save acken/1245668 to your computer and use it in GitHub Desktop.
Rabbit dequeuer
private Func<string, bool> _itemHandler;
private IConnection _connection;
private IModel _channel;
private QueueingBasicConsumer _consumer;
private bool _stop = false;
public Dequeuer(Func<string, bool> itemHandler)
{
_itemHandler = itemHandler;
}
public void Start()
{
ConnectionFactory cf = new ConnectionFactory();
cf.Address = "127.0.0.1";
_connection = cf.CreateConnection();
_channel = _connection.CreateModel();
var client = new SimpleRpcClient(_channel, "SladrehankDequeuer");
client.TimeoutMilliseconds = 5000;
_channel.ExchangeDeclare("gluon_exchange", ExchangeType.Direct);
_channel.QueueDeclare("SladrehankQueue");
_channel.QueueBind("SladrehankQueue", "gluon_exchange", "gluon", false, null);
_consumer = new QueueingBasicConsumer(_channel);
_channel.BasicConsume("SladrehankQueue", null, _consumer);
listen();
_channel.Close(Constants.ReplySuccess, "Closing the channel");
_connection.Close(Constants.ReplySuccess, "Closing the connection");
}
public void Stop()
{
_stop = true;
}
private void listen()
{
while (!_stop)
{
var e = (BasicDeliverEventArgs)_consumer.Queue.Dequeue();
handleItem(e);
}
}
private void handleItem(BasicDeliverEventArgs e)
{
byte[] body = e.Body;
var handled = false;
try
{
handled = _itemHandler.Invoke(Encoding.UTF8.GetString(body));
}
catch (Exception ex)
{
handled = false;
Console.WriteLine("Failed while handling message");
Console.WriteLine(ex.ToString());
}
finally
{
if (handled)
_channel.BasicAck(e.DeliveryTag, false);
else
_channel.BasicRecover(true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment