Created
October 18, 2013 12:13
-
-
Save rofr/7040657 to your computer and use it in GitHub Desktop.
This is an ActiveMq pub/sub wrapper. Instances are running within multiple IIS worker processes on multiple machines. I'm not receiving messages on all clients. Can you spot any obvious errors using the NMS API? For ex connection, session, producer and consumer lifecycle, lost connection issues etc. The same topic and brokerUri is used on all se…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ActiveMqTopicClient : IDisposable | |
{ | |
public delegate void MessageReceivedHandler(IMessage message); | |
public event MessageReceivedHandler MessageReceived; | |
readonly IConnection _connection; | |
readonly ISession _session; | |
readonly IMessageProducer _producer; | |
readonly IMessageConsumer _consumer; | |
bool _isDisposed = false; | |
public ActiveMqTopicClient(string topic, string brokerUri, bool ignoreLocalMessages = true, string selector = null) | |
{ | |
_connection = new ConnectionFactory(brokerUri).CreateConnection(); | |
_connection.Start(); | |
_session = _connection.CreateSession(); | |
IDestination channel = new ActiveMQTopic(topic); | |
_producer = _session.CreateProducer(channel); | |
_consumer = _session.CreateConsumer(channel, selector, ignoreLocalMessages); | |
_consumer.Listener += OnMessageReceived; | |
} | |
private void OnMessageReceived(IMessage message) | |
{ | |
if (MessageReceived != null) | |
{ | |
try | |
{ | |
MessageReceived.Invoke(message); | |
} | |
catch (Exception ex) | |
{ | |
ConcreteTypeFactory.Create<IExceptionLogBusiness>().LogServiceSideException(ex); | |
} | |
} | |
} | |
private void ThrowIfDisposed() | |
{ | |
if (_isDisposed) throw new ObjectDisposedException(GetType().FullName); | |
} | |
public void SendObjectMessage(object @object) | |
{ | |
lock (this) | |
{ | |
var message = _session.CreateObjectMessage(@object); | |
Send(message); | |
} | |
} | |
public void Send(IMessage message) | |
{ | |
lock (this) | |
{ | |
ThrowIfDisposed(); | |
try | |
{ | |
_producer.Send(message); | |
} | |
catch (Exception ex) | |
{ | |
ConcreteTypeFactory.Create<IExceptionLogBusiness>().LogServiceSideException(ex); | |
} | |
} | |
} | |
public void Dispose() | |
{ | |
lock (this) | |
{ | |
_producer.Dispose(); | |
_consumer.Dispose(); | |
_session.Dispose(); | |
_connection.Dispose(); | |
_isDisposed = true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment