Skip to content

Instantly share code, notes, and snippets.

@rofr
Created October 18, 2013 12:13
Show Gist options
  • Save rofr/7040657 to your computer and use it in GitHub Desktop.
Save rofr/7040657 to your computer and use it in GitHub Desktop.
This is an ActiveMq pub/sub wrapper. Instances are running within multiple IIS worker processes on multiple machines. I'm not receiving messages on all clients. Can you spot any obvious errors using the NMS API? For ex connection, session, producer and consumer lifecycle, lost connection issues etc. The same topic and brokerUri is used on all se…
public class ActiveMqTopicClient : IDisposable
{
public delegate void MessageReceivedHandler(IMessage message);
public event MessageReceivedHandler MessageReceived;
readonly IConnection _connection;
readonly ISession _session;
readonly IMessageProducer _producer;
readonly IMessageConsumer _consumer;
bool _isDisposed = false;
public ActiveMqTopicClient(string topic, string brokerUri, bool ignoreLocalMessages = true, string selector = null)
{
_connection = new ConnectionFactory(brokerUri).CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
IDestination channel = new ActiveMQTopic(topic);
_producer = _session.CreateProducer(channel);
_consumer = _session.CreateConsumer(channel, selector, ignoreLocalMessages);
_consumer.Listener += OnMessageReceived;
}
private void OnMessageReceived(IMessage message)
{
if (MessageReceived != null)
{
try
{
MessageReceived.Invoke(message);
}
catch (Exception ex)
{
ConcreteTypeFactory.Create<IExceptionLogBusiness>().LogServiceSideException(ex);
}
}
}
private void ThrowIfDisposed()
{
if (_isDisposed) throw new ObjectDisposedException(GetType().FullName);
}
public void SendObjectMessage(object @object)
{
lock (this)
{
var message = _session.CreateObjectMessage(@object);
Send(message);
}
}
public void Send(IMessage message)
{
lock (this)
{
ThrowIfDisposed();
try
{
_producer.Send(message);
}
catch (Exception ex)
{
ConcreteTypeFactory.Create<IExceptionLogBusiness>().LogServiceSideException(ex);
}
}
}
public void Dispose()
{
lock (this)
{
_producer.Dispose();
_consumer.Dispose();
_session.Dispose();
_connection.Dispose();
_isDisposed = true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment