Created
April 20, 2012 19:11
-
-
Save jchadwick/2431125 to your computer and use it in GitHub Desktop.
Facade for Cross-Queue MSMQ communication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public struct ControlCommand | |
{ | |
public static readonly ControlCommand Message = 0; | |
private readonly int _value; | |
public ControlCommand(int value) | |
{ | |
_value = value; | |
} | |
public static implicit operator ControlCommand(int value) | |
{ | |
return new ControlCommand(value); | |
} | |
public static implicit operator int(ControlCommand value) | |
{ | |
return value._value; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Messaging; | |
using System.Text; | |
using Newtonsoft.Json; | |
public class JsonMessageFormatter : IMessageFormatter | |
{ | |
private static readonly JsonSerializerSettings DefaultSerializerSettings = | |
new JsonSerializerSettings { | |
TypeNameHandling = TypeNameHandling.All | |
}; | |
private readonly JsonSerializerSettings _serializerSettings; | |
public Encoding Encoding { get; set; } | |
public JsonMessageFormatter(Encoding encoding = null) | |
: this(encoding, null) | |
{ | |
} | |
internal JsonMessageFormatter(Encoding encoding, JsonSerializerSettings serializerSettings = null) | |
{ | |
Encoding = encoding ?? Encoding.UTF8; | |
_serializerSettings = serializerSettings ?? DefaultSerializerSettings; | |
} | |
public bool CanRead(Message message) | |
{ | |
if (message == null) | |
throw new ArgumentNullException("message"); | |
var stream = message.BodyStream; | |
return stream != null | |
&& stream.CanRead | |
&& stream.Length > 0; | |
} | |
public object Clone() | |
{ | |
return new JsonMessageFormatter(Encoding, _serializerSettings); | |
} | |
public object Read(Message message) | |
{ | |
if (message == null) | |
throw new ArgumentNullException("message"); | |
if(CanRead(message) == false) | |
return null; | |
using (var reader = new StreamReader(message.BodyStream, Encoding)) | |
{ | |
var json = reader.ReadToEnd(); | |
Trace.WriteLine(string.Format("Message {0} serialized body:\r\n{1}", message.Id, json)); | |
return JsonConvert.DeserializeObject(json, _serializerSettings); | |
} | |
} | |
public void Write(Message message, object obj) | |
{ | |
if (message == null) | |
throw new ArgumentNullException("message"); | |
if (obj == null) | |
throw new ArgumentNullException("obj"); | |
string json = JsonConvert.SerializeObject(obj, Formatting.None, _serializerSettings); | |
Trace.WriteLine(string.Format("Message {0} serialized body:\r\n{1}", message.Id, json)); | |
message.BodyStream = new MemoryStream(Encoding.GetBytes(json)); | |
//Need to reset the body type, in case the same message | |
//is reused by some other formatter. | |
message.BodyType = 0; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Messaging; | |
using System.Transactions; | |
/// <summary> | |
/// A Message Endpoint that uses the MSMQ transport layer | |
/// </summary> | |
/// <example> | |
/// new MsmqMessageEndpoint().Send("Hello, world!"); | |
/// </example> | |
public class MsmqMessageEndpoint | |
{ | |
private readonly ICollection<KeyValuePair<ControlCommand, Action<object>>> _messageHandlers; | |
private readonly string _messageQueueName; | |
private readonly string _errorQueueName; | |
public string Uri { get; private set; } | |
public IMessageFormatter MessageFormatter { get; set; } | |
protected internal MessageQueue MessageQueue | |
{ | |
get { return _messageQueue.Value; } | |
} | |
private readonly Lazy<MessageQueue> _messageQueue; | |
public MsmqMessageEndpoint(string uri) | |
{ | |
_messageHandlers = new List<KeyValuePair<ControlCommand, Action<object>>>(); | |
MessageFormatter = new JsonMessageFormatter(); | |
Uri = uri; | |
_messageQueueName = GetEndpointName(uri); | |
_errorQueueName = _messageQueueName + "_errors"; | |
_messageQueue = new Lazy<MessageQueue>(() => | |
new MessageQueue(_messageQueueName) | |
{ | |
MessageReadPropertyFilter = { AppSpecific = true } | |
}); | |
} | |
public void Subscribe(ControlCommand command, Action<object> action) | |
{ | |
_messageHandlers.Add(new KeyValuePair<ControlCommand, Action<object>>(command, action)); | |
} | |
public void Start() | |
{ | |
Trace.TraceInformation("Starting listener on MSMQ endpoint {0}...", _messageQueueName); | |
CreateTransactionalQueueIfNotExists(_messageQueueName); | |
CreateTransactionalQueueIfNotExists(_errorQueueName); | |
MessageQueue.PeekCompleted += QueuePeekCompleted; | |
MessageQueue.BeginPeek(); | |
Trace.TraceInformation("Listening on MSMQ endpoint {0}.", _messageQueueName); | |
} | |
public void Stop() | |
{ | |
if (_messageQueue.IsValueCreated) | |
{ | |
MessageQueue.PeekCompleted -= QueuePeekCompleted; | |
} | |
Trace.TraceInformation("Stopped listening on MSMQ endpoint {0}.", _messageQueueName); | |
} | |
public void Send<TMessage>(TMessage message, MsmqMessageEndpoint responseEndpoint = null, TimeSpan? timeToLive = null) | |
{ | |
var msmqMessage = BuildMsmqMessage( | |
ControlCommand.Message, | |
typeof(TMessage).Name, | |
message, | |
responseEndpoint, | |
timeToLive | |
); | |
using (var transaction = new MessageQueueTransaction()) | |
{ | |
transaction.Begin(); | |
MessageQueue.Send(msmqMessage, transaction); | |
transaction.Commit(); | |
} | |
Trace.WriteLine(string.Format("Sent {0} message {1}", typeof(TMessage).Name, msmqMessage.Id)); | |
} | |
public void SendControlCommand(ControlCommand command, string descriptor, object message, MsmqMessageEndpoint responseEndpoint = null, TimeSpan? timeToLive = null) | |
{ | |
var msmqMessage = BuildMsmqMessage(command, descriptor, message, responseEndpoint, timeToLive); | |
using (var transaction = new TransactionScope()) | |
{ | |
MessageQueue.Send(msmqMessage, MessageQueueTransactionType.Automatic); | |
transaction.Complete(); | |
} | |
Trace.WriteLine(string.Format("Sent command: {0}", command.ToString())); | |
} | |
private Message BuildMsmqMessage(ControlCommand command, string descriptor, object message, | |
MsmqMessageEndpoint responseEndpoint, TimeSpan? timeToLive) | |
{ | |
if (responseEndpoint != null && !(responseEndpoint is MsmqMessageEndpoint)) | |
throw new NotSupportedException("MSMQ endpoints can only talk to each other!"); | |
MessageQueue responseQueue = null; | |
var msmqResponseEndpoint = responseEndpoint as MsmqMessageEndpoint; | |
if (msmqResponseEndpoint != null) | |
responseQueue = msmqResponseEndpoint.MessageQueue; | |
var msmqMessage = new Message | |
{ | |
AppSpecific = (int) command, | |
Body = message, | |
Formatter = (IMessageFormatter)MessageFormatter.Clone(), | |
Label = descriptor, | |
Recoverable = true, | |
ResponseQueue = responseQueue, | |
}; | |
if (timeToLive.HasValue) | |
msmqMessage.TimeToBeReceived = timeToLive.Value; | |
return msmqMessage; | |
} | |
private static void CreateTransactionalQueueIfNotExists(string queueName) | |
{ | |
if (!MessageQueue.Exists(queueName)) | |
MessageQueue.Create(queueName, true); | |
} | |
private void QueuePeekCompleted(object sender, PeekCompletedEventArgs e) | |
{ | |
var messageQueue = (MessageQueue)sender; | |
messageQueue.EndPeek(e.AsyncResult); | |
Message message = null; | |
try | |
{ | |
message = messageQueue.Receive(); | |
if (message == null) | |
throw new InvalidOperationException("Null message"); | |
message.Formatter = (IMessageFormatter) MessageFormatter.Clone(); | |
Trace.WriteLine(string.Format("Received message {0}", message.Id)); | |
ControlCommand command = message.AppSpecific; | |
var handlers = _messageHandlers | |
.Where(x => x.Key == command) | |
.Select(x => x.Value) | |
.Where(x => x != null) | |
.ToArray(); | |
var handlerCount = handlers.Count(); | |
Trace.WriteLine(string.Format("Executing {0} handlers", handlerCount)); | |
if (!handlers.Any()) | |
handlers = new Action<object>[] { msg => Trace.TraceWarning("Message {0} has no registered handlers", message.Id) }; | |
foreach (var handler in handlers) | |
{ | |
if (command == ControlCommand.Message) | |
{ | |
handler(message.Body); | |
} | |
else | |
{ | |
handler(message); | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
LogError(message, ex); | |
} | |
messageQueue.Refresh(); | |
messageQueue.BeginPeek(); | |
} | |
private static string GetEndpointName(string value) | |
{ | |
var match = | |
System.Text.RegularExpressions.Regex.Match(value, | |
"(msmq://)?(?<Queue>([^@])*)(@(?<Machine>.*))?"); | |
var queue = match.Groups["Queue"].Value; | |
var machine = match.Groups["Machine"].Value; | |
if (machine == "localhost" || string.IsNullOrWhiteSpace(machine)) | |
machine = "."; | |
return machine + "\\private$\\" + queue; | |
} | |
private void LogError(Message message, Exception exception = null) | |
{ | |
if (exception != null) | |
Trace.TraceError("{0}\r\n{1}", exception.Message, exception.StackTrace); | |
if (message == null) | |
return; | |
using (var scope = new TransactionScope()) | |
{ | |
using (var errorQueue = new MessageQueue(_errorQueueName)) | |
{ | |
errorQueue.Send(message, MessageQueueTransactionType.Automatic); | |
} | |
scope.Complete(); | |
} | |
} | |
public void Dispose() | |
{ | |
Stop(); | |
if(_messageQueue != null && _messageQueue.IsValueCreated) | |
_messageQueue.Value.Dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment