Created
September 27, 2011 17:17
-
-
Save acken/1245668 to your computer and use it in GitHub Desktop.
Rabbit dequeuer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private Func<string, bool> _itemHandler; | |
| private IConnection _connection; | |
| private IModel _channel; | |
| private QueueingBasicConsumer _consumer; | |
| private bool _stop = false; | |
| public Dequeuer(Func<string, bool> itemHandler) | |
| { | |
| _itemHandler = itemHandler; | |
| } | |
| public void Start() | |
| { | |
| ConnectionFactory cf = new ConnectionFactory(); | |
| cf.Address = "127.0.0.1"; | |
| _connection = cf.CreateConnection(); | |
| _channel = _connection.CreateModel(); | |
| var client = new SimpleRpcClient(_channel, "SladrehankDequeuer"); | |
| client.TimeoutMilliseconds = 5000; | |
| _channel.ExchangeDeclare("gluon_exchange", ExchangeType.Direct); | |
| _channel.QueueDeclare("SladrehankQueue"); | |
| _channel.QueueBind("SladrehankQueue", "gluon_exchange", "gluon", false, null); | |
| _consumer = new QueueingBasicConsumer(_channel); | |
| _channel.BasicConsume("SladrehankQueue", null, _consumer); | |
| listen(); | |
| _channel.Close(Constants.ReplySuccess, "Closing the channel"); | |
| _connection.Close(Constants.ReplySuccess, "Closing the connection"); | |
| } | |
| public void Stop() | |
| { | |
| _stop = true; | |
| } | |
| private void listen() | |
| { | |
| while (!_stop) | |
| { | |
| var e = (BasicDeliverEventArgs)_consumer.Queue.Dequeue(); | |
| handleItem(e); | |
| } | |
| } | |
| private void handleItem(BasicDeliverEventArgs e) | |
| { | |
| byte[] body = e.Body; | |
| var handled = false; | |
| try | |
| { | |
| handled = _itemHandler.Invoke(Encoding.UTF8.GetString(body)); | |
| } | |
| catch (Exception ex) | |
| { | |
| handled = false; | |
| Console.WriteLine("Failed while handling message"); | |
| Console.WriteLine(ex.ToString()); | |
| } | |
| finally | |
| { | |
| if (handled) | |
| _channel.BasicAck(e.DeliveryTag, false); | |
| else | |
| _channel.BasicRecover(true); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment