Created
June 3, 2014 04:21
-
-
Save meisinger/ba6a122cb068e64efb0f to your computer and use it in GitHub Desktop.
factory class to create a connection to rabbit mq along with a consumer and publisher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using RabbitMQ.Client; | |
using RabbitMQ.Client.Exceptions; | |
public class MessageQueueFactory : IMessageQueueFactory | |
{ | |
private readonly ConnectionFactory factory; | |
private IConnection connection; | |
public bool IsOpen | |
{ | |
get { return (connection != null && connection.IsOpen); } | |
} | |
public MessageQueueFactory(string host) | |
{ | |
factory = new ConnectionFactory | |
{ | |
HostName = host, | |
UserName = ConnectionFactory.DefaultUser, | |
Password = ConnectionFactory.DefaultPass, | |
VirtualHost = ConnectionFactory.DefaultVHost, | |
Protocol = Protocols.DefaultProtocol, | |
Port = AmqpTcpEndpoint.UseDefaultPort | |
}; | |
} | |
public void Dispose() | |
{ | |
if (connection != null && connection.IsOpen) | |
connection.Close(); | |
} | |
public void OpenConnection() | |
{ | |
try | |
{ | |
connection = factory.CreateConnection(); | |
} | |
catch (BrokerUnreachableException ex) | |
{ | |
throw new IOException( | |
string.Format("Unable to open connection to RabbitMQ Host \"{0}\".", | |
factory.HostName), ex); | |
} | |
} | |
public IMessageQueuePublisher CreatePublisher() | |
{ | |
if (connection == null || !connection.IsOpen) | |
throw new IOException("RabbitMQ connection is not open or has been closed."); | |
var channel = connection.CreateModel(); | |
return new MessageQueuePublisher(channel); | |
} | |
public IMessageQueueSubscriber CreateSubscriber() | |
{ | |
if (connection == null || !connection.IsOpen) | |
throw new IOException("RabbitMQ connection is not open or has been closed."); | |
var channel = connection.CreateModel(); | |
return new MessageQueueSubscriber(channel); | |
} | |
} | |
public class MessageQueueSubscriber : IMessageQueueSubscriber | |
{ | |
private readonly IModel channel; | |
public MessageQueueSubscriber(IModel channel) | |
{ | |
this.channel = channel; | |
} | |
public void Dispose() | |
{ | |
if (channel.IsOpen) | |
channel.Dispose(); | |
} | |
public void Consume(string queue, Func<IQueueMessage, bool> func, CancellationToken token) | |
{ | |
Consume(queue, 1, func, token); | |
} | |
public void Consume(string queue, ushort prefetchCount, Func<IQueueMessage, bool> func, CancellationToken token) | |
{ | |
var consumer = new CooperativeConsumer(channel); | |
if (prefetchCount != 0) | |
channel.BasicQos(0, prefetchCount, false); | |
channel.BasicConsume(queue, false, consumer); | |
while (consumer.IsRunning) | |
{ | |
if (token.IsCancellationRequested) | |
break; | |
try | |
{ | |
var item = consumer.Queue.Dequeue(token); | |
if (item == null) | |
continue; | |
var handled = func(item); | |
if (handled) | |
channel.BasicAck(item.MessageId, false); | |
else | |
channel.BasicReject(item.MessageId, false); | |
} | |
catch (EndOfStreamException) | |
{ | |
break; | |
} | |
} | |
} | |
} | |
public class MessageQueuePublisher : IMessageQueuePublisher | |
{ | |
private readonly IModel channel; | |
public MessageQueuePublisher(IModel channel) | |
{ | |
this.channel = channel; | |
} | |
public void Dispose() | |
{ | |
if (channel.IsOpen) | |
channel.Close(); | |
} | |
public void Publish(string exchangeName, string message, IQueueMessageProperties properties) | |
{ | |
Publish(exchangeName, string.Empty, message, properties); | |
} | |
public void Publish(string exchangeName, string exchangeRoute, string message, IQueueMessageProperties properties) | |
{ | |
var messageProperties = QueueMessage.ConvertProperties(channel, properties); | |
var messageBody = Encoding.UTF8.GetBytes(message); | |
channel.BasicPublish(exchangeName, exchangeRoute, false, false, messageProperties, messageBody); | |
} | |
public void Publish<T>(string exchangeName, T message, IQueueMessageProperties properties) | |
where T : class | |
{ | |
Publish(exchangeName, string.Empty, message, properties); | |
} | |
public void Publish<T>(string exchangeName, string exchangeRoute, T message, IQueueMessageProperties properties) | |
where T : class | |
{ | |
var messageProperties = QueueMessage.ConvertProperties(channel, properties); | |
var messageBody = JsonSerializer.SerializeToBytes(message); | |
channel.BasicPublish(exchangeName, exchangeRoute, false, false, messageProperties, messageBody); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment