Skip to content

Instantly share code, notes, and snippets.

@jchadwick
Last active January 11, 2019 15:03
Show Gist options
  • Save jchadwick/3158499 to your computer and use it in GitHub Desktop.
Save jchadwick/3158499 to your computer and use it in GitHub Desktop.
A helper class that provides delayed processing of a message via MSMQ
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Messaging;
using NLog;
/// <summary>
/// A facade over Microsoft's MSMQ
/// </summary>
public abstract class QueuedService : IDisposable
{
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
private bool _initialized;
private readonly MessageQueue _queue;
private readonly string _messageQueueName;
public bool DiscardInvalidMessages { get; set; }
protected MessageQueue MessageQueue
{
get { return _queue; }
}
public string MessageQueueName
{
get { return _messageQueueName; }
}
protected List<Type> MessageTypes { get; private set; }
protected QueuedService(string messageQueueName, params Type[] messageTypes)
{
if (messageQueueName == null)
throw new ArgumentNullException("messageQueueName");
if (string.IsNullOrWhiteSpace(messageQueueName))
throw new ArgumentOutOfRangeException("messageQueueName");
_messageQueueName = messageQueueName;
_queue = CreateQueue(messageTypes);
}
protected virtual MessageQueue CreateQueue(Type[] messageTypes)
{
return new MessageQueue(MessageQueueName)
{
MessageReadPropertyFilter = { AppSpecific = true },
Formatter = new XmlMessageFormatter(messageTypes),
};
}
protected void Send(object request)
{
EnsureInitialized();
var message = new Message
{
Body = request,
Recoverable = true,
};
using (var trans = new MessageQueueTransaction())
{
trans.Begin();
try
{
_queue.Send(message, trans);
trans.Commit();
}
catch (Exception ex)
{
Trace.TraceError("Error sending message: " + ex);
trans.Abort();
}
}
}
public void Start()
{
EnsureInitialized();
_queue.PeekCompleted += HandleMessagePeek;
_queue.BeginPeek();
}
public void Stop()
{
if (_queue != null)
_queue.PeekCompleted -= HandleMessagePeek;
}
protected void EnsureInitialized()
{
if (_initialized) return;
try
{
if (!MessageQueue.Exists(MessageQueueName))
MessageQueue.Create(MessageQueueName, true);
}
catch (InvalidOperationException ex)
{
Log.ErrorException("Failed to create MSMQ Message Queue -- check to ensure MSMQ is installed, that the queue has been created, and/or that the application user has the correct permissions to create a queue", ex);
throw;
}
_initialized = true;
}
private void HandleMessagePeek(object sender, PeekCompletedEventArgs args)
{
_queue.EndPeek(args.AsyncResult);
using (var trans = new MessageQueueTransaction())
{
trans.Begin();
MessageProcessingResult processingResult;
object message = null;
try
{
var msmqMessage = _queue.Receive(trans);
if (msmqMessage == null)
{
Log.Warn("Null message in queue -- aborting...");
trans.Commit();
return;
}
msmqMessage.Formatter = _queue.Formatter;
message = msmqMessage.Body;
processingResult = ProcessMessage(message);
}
catch (Exception ex)
{
Trace.TraceError("Error queuing message: " + ex);
processingResult = MessageProcessingResult.Failed;
}
switch (processingResult)
{
case MessageProcessingResult.Retry:
if (message != null)
Send(message);
trans.Commit();
break;
case MessageProcessingResult.Success:
trans.Commit();
break;
default:
trans.Abort();
break;
}
}
_queue.BeginPeek();
}
protected abstract MessageProcessingResult ProcessMessage(dynamic message);
public virtual void Dispose()
{
Stop();
}
protected enum MessageProcessingResult
{
Failed,
Retry,
Success,
}
}
@rlightner
Copy link

Any examples on how you're using this?

@kiquenet
Copy link

Any full source code sample (real application using it) about it ?

@smartmeter
Copy link

If a queue has multiple messages, how can I search and find it by destination userId, whom it was intended for?

@ContactChetanLad
Copy link

Can I pls have implementation. I tried usinh the following but not able to pass back the MessageProcessingResult
/*********************************************************/
public class ProcessMsmqDetails : QueuedService
{

    public ProcessMsmqDetails(string messageQueueName, params Type[] messageTypes) : base(messageQueueName, messageTypes)
    {
        //Constructor             
    }

    public override MessageProcessingResult ProcessMessage(dynamic message)
    {

        //throw new NotImplementedException();
        Send(message);
        return MessageProcessingResult.Success; //TODO : need to check respective status & send it back
    }
}

/*********************************************************/
ProcessMsmqDetails processMsmqDetails = new ProcessMsmqDetails(queueName, typeof(List));

            processMsmqDetails.Start(); //START MSMQ process

            //Loop thru the set of Data & Insert into MSMQ
            for (pageNumber = 0; pageNumber <= (sendPNMsgDataModel.PnMessagesDetailsList.Count / batchSize); pageNumber++)
            {
                //Get Set of records
                var sendPNMsgDataModelBatchData = sendPNMsgDataModel.PnMessagesDetailsList.Skip(pageNumber * batchSize).Take(batchSize);
                List<PnMessagesDetails> sendPNMsgDataModelBatchDataList = sendPNMsgDataModelBatchData.ToList();

                //send to MSMQ
                MessageProcessingResult messageProcessingResult = processMsmqDetails.ProcessMessage(sendPNMsgDataModelBatchDataList);

                //ProcessMsmqDetails ProcessMsmqDetails = new ProcessMsmqDetails(queueName, typeof(List<PnMessagesDetails>));
            }
            //sendPNDataModelResult = processBatchData(sendPNMsgDataModel);

            processMsmqDetails.Stop();  //STOP MSMQ process

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment