Last active
January 11, 2019 15:03
-
-
Save jchadwick/3158499 to your computer and use it in GitHub Desktop.
A helper class that provides delayed processing of a message via MSMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Messaging; | |
using NLog; | |
/// <summary> | |
/// A facade over Microsoft's MSMQ | |
/// </summary> | |
public abstract class QueuedService : IDisposable | |
{ | |
private static readonly Logger Log = LogManager.GetCurrentClassLogger(); | |
private bool _initialized; | |
private readonly MessageQueue _queue; | |
private readonly string _messageQueueName; | |
public bool DiscardInvalidMessages { get; set; } | |
protected MessageQueue MessageQueue | |
{ | |
get { return _queue; } | |
} | |
public string MessageQueueName | |
{ | |
get { return _messageQueueName; } | |
} | |
protected List<Type> MessageTypes { get; private set; } | |
protected QueuedService(string messageQueueName, params Type[] messageTypes) | |
{ | |
if (messageQueueName == null) | |
throw new ArgumentNullException("messageQueueName"); | |
if (string.IsNullOrWhiteSpace(messageQueueName)) | |
throw new ArgumentOutOfRangeException("messageQueueName"); | |
_messageQueueName = messageQueueName; | |
_queue = CreateQueue(messageTypes); | |
} | |
protected virtual MessageQueue CreateQueue(Type[] messageTypes) | |
{ | |
return new MessageQueue(MessageQueueName) | |
{ | |
MessageReadPropertyFilter = { AppSpecific = true }, | |
Formatter = new XmlMessageFormatter(messageTypes), | |
}; | |
} | |
protected void Send(object request) | |
{ | |
EnsureInitialized(); | |
var message = new Message | |
{ | |
Body = request, | |
Recoverable = true, | |
}; | |
using (var trans = new MessageQueueTransaction()) | |
{ | |
trans.Begin(); | |
try | |
{ | |
_queue.Send(message, trans); | |
trans.Commit(); | |
} | |
catch (Exception ex) | |
{ | |
Trace.TraceError("Error sending message: " + ex); | |
trans.Abort(); | |
} | |
} | |
} | |
public void Start() | |
{ | |
EnsureInitialized(); | |
_queue.PeekCompleted += HandleMessagePeek; | |
_queue.BeginPeek(); | |
} | |
public void Stop() | |
{ | |
if (_queue != null) | |
_queue.PeekCompleted -= HandleMessagePeek; | |
} | |
protected void EnsureInitialized() | |
{ | |
if (_initialized) return; | |
try | |
{ | |
if (!MessageQueue.Exists(MessageQueueName)) | |
MessageQueue.Create(MessageQueueName, true); | |
} | |
catch (InvalidOperationException ex) | |
{ | |
Log.ErrorException("Failed to create MSMQ Message Queue -- check to ensure MSMQ is installed, that the queue has been created, and/or that the application user has the correct permissions to create a queue", ex); | |
throw; | |
} | |
_initialized = true; | |
} | |
private void HandleMessagePeek(object sender, PeekCompletedEventArgs args) | |
{ | |
_queue.EndPeek(args.AsyncResult); | |
using (var trans = new MessageQueueTransaction()) | |
{ | |
trans.Begin(); | |
MessageProcessingResult processingResult; | |
object message = null; | |
try | |
{ | |
var msmqMessage = _queue.Receive(trans); | |
if (msmqMessage == null) | |
{ | |
Log.Warn("Null message in queue -- aborting..."); | |
trans.Commit(); | |
return; | |
} | |
msmqMessage.Formatter = _queue.Formatter; | |
message = msmqMessage.Body; | |
processingResult = ProcessMessage(message); | |
} | |
catch (Exception ex) | |
{ | |
Trace.TraceError("Error queuing message: " + ex); | |
processingResult = MessageProcessingResult.Failed; | |
} | |
switch (processingResult) | |
{ | |
case MessageProcessingResult.Retry: | |
if (message != null) | |
Send(message); | |
trans.Commit(); | |
break; | |
case MessageProcessingResult.Success: | |
trans.Commit(); | |
break; | |
default: | |
trans.Abort(); | |
break; | |
} | |
} | |
_queue.BeginPeek(); | |
} | |
protected abstract MessageProcessingResult ProcessMessage(dynamic message); | |
public virtual void Dispose() | |
{ | |
Stop(); | |
} | |
protected enum MessageProcessingResult | |
{ | |
Failed, | |
Retry, | |
Success, | |
} | |
} |
Any full source code sample (real application using it) about it ?
If a queue has multiple messages, how can I search and find it by destination userId, whom it was intended for?
Can I pls have implementation. I tried usinh the following but not able to pass back the MessageProcessingResult
/*********************************************************/
public class ProcessMsmqDetails : QueuedService
{
public ProcessMsmqDetails(string messageQueueName, params Type[] messageTypes) : base(messageQueueName, messageTypes)
{
//Constructor
}
public override MessageProcessingResult ProcessMessage(dynamic message)
{
//throw new NotImplementedException();
Send(message);
return MessageProcessingResult.Success; //TODO : need to check respective status & send it back
}
}
/*********************************************************/
ProcessMsmqDetails processMsmqDetails = new ProcessMsmqDetails(queueName, typeof(List));
processMsmqDetails.Start(); //START MSMQ process
//Loop thru the set of Data & Insert into MSMQ
for (pageNumber = 0; pageNumber <= (sendPNMsgDataModel.PnMessagesDetailsList.Count / batchSize); pageNumber++)
{
//Get Set of records
var sendPNMsgDataModelBatchData = sendPNMsgDataModel.PnMessagesDetailsList.Skip(pageNumber * batchSize).Take(batchSize);
List<PnMessagesDetails> sendPNMsgDataModelBatchDataList = sendPNMsgDataModelBatchData.ToList();
//send to MSMQ
MessageProcessingResult messageProcessingResult = processMsmqDetails.ProcessMessage(sendPNMsgDataModelBatchDataList);
//ProcessMsmqDetails ProcessMsmqDetails = new ProcessMsmqDetails(queueName, typeof(List<PnMessagesDetails>));
}
//sendPNDataModelResult = processBatchData(sendPNMsgDataModel);
processMsmqDetails.Stop(); //STOP MSMQ process
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Any examples on how you're using this?