Skip to content

Instantly share code, notes, and snippets.

@jchadwick
Created April 20, 2012 19:11
Show Gist options
  • Save jchadwick/2431125 to your computer and use it in GitHub Desktop.
Save jchadwick/2431125 to your computer and use it in GitHub Desktop.
Facade for Cross-Queue MSMQ communication
public struct ControlCommand
{
public static readonly ControlCommand Message = 0;
private readonly int _value;
public ControlCommand(int value)
{
_value = value;
}
public static implicit operator ControlCommand(int value)
{
return new ControlCommand(value);
}
public static implicit operator int(ControlCommand value)
{
return value._value;
}
}
using System;
using System.Diagnostics;
using System.IO;
using System.Messaging;
using System.Text;
using Newtonsoft.Json;
public class JsonMessageFormatter : IMessageFormatter
{
private static readonly JsonSerializerSettings DefaultSerializerSettings =
new JsonSerializerSettings {
TypeNameHandling = TypeNameHandling.All
};
private readonly JsonSerializerSettings _serializerSettings;
public Encoding Encoding { get; set; }
public JsonMessageFormatter(Encoding encoding = null)
: this(encoding, null)
{
}
internal JsonMessageFormatter(Encoding encoding, JsonSerializerSettings serializerSettings = null)
{
Encoding = encoding ?? Encoding.UTF8;
_serializerSettings = serializerSettings ?? DefaultSerializerSettings;
}
public bool CanRead(Message message)
{
if (message == null)
throw new ArgumentNullException("message");
var stream = message.BodyStream;
return stream != null
&& stream.CanRead
&& stream.Length > 0;
}
public object Clone()
{
return new JsonMessageFormatter(Encoding, _serializerSettings);
}
public object Read(Message message)
{
if (message == null)
throw new ArgumentNullException("message");
if(CanRead(message) == false)
return null;
using (var reader = new StreamReader(message.BodyStream, Encoding))
{
var json = reader.ReadToEnd();
Trace.WriteLine(string.Format("Message {0} serialized body:\r\n{1}", message.Id, json));
return JsonConvert.DeserializeObject(json, _serializerSettings);
}
}
public void Write(Message message, object obj)
{
if (message == null)
throw new ArgumentNullException("message");
if (obj == null)
throw new ArgumentNullException("obj");
string json = JsonConvert.SerializeObject(obj, Formatting.None, _serializerSettings);
Trace.WriteLine(string.Format("Message {0} serialized body:\r\n{1}", message.Id, json));
message.BodyStream = new MemoryStream(Encoding.GetBytes(json));
//Need to reset the body type, in case the same message
//is reused by some other formatter.
message.BodyType = 0;
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Messaging;
using System.Transactions;
/// <summary>
/// A Message Endpoint that uses the MSMQ transport layer
/// </summary>
/// <example>
/// new MsmqMessageEndpoint().Send("Hello, world!");
/// </example>
public class MsmqMessageEndpoint
{
private readonly ICollection<KeyValuePair<ControlCommand, Action<object>>> _messageHandlers;
private readonly string _messageQueueName;
private readonly string _errorQueueName;
public string Uri { get; private set; }
public IMessageFormatter MessageFormatter { get; set; }
protected internal MessageQueue MessageQueue
{
get { return _messageQueue.Value; }
}
private readonly Lazy<MessageQueue> _messageQueue;
public MsmqMessageEndpoint(string uri)
{
_messageHandlers = new List<KeyValuePair<ControlCommand, Action<object>>>();
MessageFormatter = new JsonMessageFormatter();
Uri = uri;
_messageQueueName = GetEndpointName(uri);
_errorQueueName = _messageQueueName + "_errors";
_messageQueue = new Lazy<MessageQueue>(() =>
new MessageQueue(_messageQueueName)
{
MessageReadPropertyFilter = { AppSpecific = true }
});
}
public void Subscribe(ControlCommand command, Action<object> action)
{
_messageHandlers.Add(new KeyValuePair<ControlCommand, Action<object>>(command, action));
}
public void Start()
{
Trace.TraceInformation("Starting listener on MSMQ endpoint {0}...", _messageQueueName);
CreateTransactionalQueueIfNotExists(_messageQueueName);
CreateTransactionalQueueIfNotExists(_errorQueueName);
MessageQueue.PeekCompleted += QueuePeekCompleted;
MessageQueue.BeginPeek();
Trace.TraceInformation("Listening on MSMQ endpoint {0}.", _messageQueueName);
}
public void Stop()
{
if (_messageQueue.IsValueCreated)
{
MessageQueue.PeekCompleted -= QueuePeekCompleted;
}
Trace.TraceInformation("Stopped listening on MSMQ endpoint {0}.", _messageQueueName);
}
public void Send<TMessage>(TMessage message, MsmqMessageEndpoint responseEndpoint = null, TimeSpan? timeToLive = null)
{
var msmqMessage = BuildMsmqMessage(
ControlCommand.Message,
typeof(TMessage).Name,
message,
responseEndpoint,
timeToLive
);
using (var transaction = new MessageQueueTransaction())
{
transaction.Begin();
MessageQueue.Send(msmqMessage, transaction);
transaction.Commit();
}
Trace.WriteLine(string.Format("Sent {0} message {1}", typeof(TMessage).Name, msmqMessage.Id));
}
public void SendControlCommand(ControlCommand command, string descriptor, object message, MsmqMessageEndpoint responseEndpoint = null, TimeSpan? timeToLive = null)
{
var msmqMessage = BuildMsmqMessage(command, descriptor, message, responseEndpoint, timeToLive);
using (var transaction = new TransactionScope())
{
MessageQueue.Send(msmqMessage, MessageQueueTransactionType.Automatic);
transaction.Complete();
}
Trace.WriteLine(string.Format("Sent command: {0}", command.ToString()));
}
private Message BuildMsmqMessage(ControlCommand command, string descriptor, object message,
MsmqMessageEndpoint responseEndpoint, TimeSpan? timeToLive)
{
if (responseEndpoint != null && !(responseEndpoint is MsmqMessageEndpoint))
throw new NotSupportedException("MSMQ endpoints can only talk to each other!");
MessageQueue responseQueue = null;
var msmqResponseEndpoint = responseEndpoint as MsmqMessageEndpoint;
if (msmqResponseEndpoint != null)
responseQueue = msmqResponseEndpoint.MessageQueue;
var msmqMessage = new Message
{
AppSpecific = (int) command,
Body = message,
Formatter = (IMessageFormatter)MessageFormatter.Clone(),
Label = descriptor,
Recoverable = true,
ResponseQueue = responseQueue,
};
if (timeToLive.HasValue)
msmqMessage.TimeToBeReceived = timeToLive.Value;
return msmqMessage;
}
private static void CreateTransactionalQueueIfNotExists(string queueName)
{
if (!MessageQueue.Exists(queueName))
MessageQueue.Create(queueName, true);
}
private void QueuePeekCompleted(object sender, PeekCompletedEventArgs e)
{
var messageQueue = (MessageQueue)sender;
messageQueue.EndPeek(e.AsyncResult);
Message message = null;
try
{
message = messageQueue.Receive();
if (message == null)
throw new InvalidOperationException("Null message");
message.Formatter = (IMessageFormatter) MessageFormatter.Clone();
Trace.WriteLine(string.Format("Received message {0}", message.Id));
ControlCommand command = message.AppSpecific;
var handlers = _messageHandlers
.Where(x => x.Key == command)
.Select(x => x.Value)
.Where(x => x != null)
.ToArray();
var handlerCount = handlers.Count();
Trace.WriteLine(string.Format("Executing {0} handlers", handlerCount));
if (!handlers.Any())
handlers = new Action<object>[] { msg => Trace.TraceWarning("Message {0} has no registered handlers", message.Id) };
foreach (var handler in handlers)
{
if (command == ControlCommand.Message)
{
handler(message.Body);
}
else
{
handler(message);
}
}
}
catch (Exception ex)
{
LogError(message, ex);
}
messageQueue.Refresh();
messageQueue.BeginPeek();
}
private static string GetEndpointName(string value)
{
var match =
System.Text.RegularExpressions.Regex.Match(value,
"(msmq://)?(?<Queue>([^@])*)(@(?<Machine>.*))?");
var queue = match.Groups["Queue"].Value;
var machine = match.Groups["Machine"].Value;
if (machine == "localhost" || string.IsNullOrWhiteSpace(machine))
machine = ".";
return machine + "\\private$\\" + queue;
}
private void LogError(Message message, Exception exception = null)
{
if (exception != null)
Trace.TraceError("{0}\r\n{1}", exception.Message, exception.StackTrace);
if (message == null)
return;
using (var scope = new TransactionScope())
{
using (var errorQueue = new MessageQueue(_errorQueueName))
{
errorQueue.Send(message, MessageQueueTransactionType.Automatic);
}
scope.Complete();
}
}
public void Dispose()
{
Stop();
if(_messageQueue != null && _messageQueue.IsValueCreated)
_messageQueue.Value.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment