Created
December 21, 2011 12:18
-
-
Save jsinh/1505827 to your computer and use it in GitHub Desktop.
MSMQ using Rx - Code snippet for MSMQ receive timeout problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Messaging; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
namespace MSMQPubSubRx | |
{ | |
class Program | |
{ | |
public static MessageQueue msmqQueue; | |
public static string queueMessageString = "Test String to queue in the MSMQ"; | |
static void Main(string[] args) | |
{ | |
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename"); | |
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) }); | |
// Part of code to run for queueing of messages in message queue. | |
//for (int i = 0; ; i++) | |
//{ | |
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString)); | |
//} | |
// Part of code to run for dequeuing of message from message queue | |
StartQueueMessageReceivingProcess(); | |
Console.ReadLine(); | |
} | |
/// <summary> | |
/// Start receiving client messages using Rx & Observable. | |
/// </summary> | |
static void StartQueueMessageReceivingProcess() | |
{ | |
var incomingMessage = StartDequeueing() | |
.Repeat() | |
//.TakeWhile(x => x != null) | |
.SubscribeOn(Scheduler.TaskPool) | |
.Subscribe(onNext: messageObject => | |
{ | |
if (messageObject != null) | |
{ | |
QueueMessageRecievedHandler(messageObject); | |
} | |
}, | |
onError: exceptionMessage => | |
{ | |
QueueMessageReceivingExceptionOccurredHandler(exceptionMessage); | |
}); | |
} | |
static IObservable<object> StartDequeueing() | |
{ | |
return Observable.Create<object>(observer => | |
{ | |
var anonyFunction = Observable | |
.FromAsyncPattern | |
( | |
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject), | |
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject) | |
)() | |
.Select(queueMessage => | |
{ | |
return queueMessage.Body; | |
}); | |
return anonyFunction.Catch<object, Exception>(exception => | |
{ | |
return Observable.Empty<object>(); | |
}) | |
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted); | |
}) | |
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); }) | |
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); }); | |
} | |
static void QueueMessageRecievedHandler(object queueMessage) | |
{ | |
Console.WriteLine("Message Received from Queue. Congrats !!"); | |
} | |
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject) | |
{ | |
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Nice inplementation, especially around the error handling. Did you ever get this resolved? I think you just need an onCompletion to BeginReceive again so the Rx "listens" again. You may also want to consider BeginPeek instead, especially if using a transactional queue. I'm working on creating a similar project using Rx and MSMQ, if you've completed this code I would love to see an example of the finished product. Thanks!