Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jsinh/1505827 to your computer and use it in GitHub Desktop.
Save jsinh/1505827 to your computer and use it in GitHub Desktop.
MSMQ using Rx - Code snippet for MSMQ receive timeout problem
using System;
using System.Messaging;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace MSMQPubSubRx
{
class Program
{
public static MessageQueue msmqQueue;
public static string queueMessageString = "Test String to queue in the MSMQ";
static void Main(string[] args)
{
msmqQueue = new MessageQueue("FormatName:DIRECT=OS:msmqserver\\private$\\myqueuename");
msmqQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(byte[]) });
// Part of code to run for queueing of messages in message queue.
//for (int i = 0; ; i++)
//{
// msmqQueue.Send(Encoding.ASCII.GetBytes(queueMessageString));
//}
// Part of code to run for dequeuing of message from message queue
StartQueueMessageReceivingProcess();
Console.ReadLine();
}
/// <summary>
/// Start receiving client messages using Rx & Observable.
/// </summary>
static void StartQueueMessageReceivingProcess()
{
var incomingMessage = StartDequeueing()
.Repeat()
//.TakeWhile(x => x != null)
.SubscribeOn(Scheduler.TaskPool)
.Subscribe(onNext: messageObject =>
{
if (messageObject != null)
{
QueueMessageRecievedHandler(messageObject);
}
},
onError: exceptionMessage =>
{
QueueMessageReceivingExceptionOccurredHandler(exceptionMessage);
});
}
static IObservable<object> StartDequeueing()
{
return Observable.Create<object>(observer =>
{
var anonyFunction = Observable
.FromAsyncPattern
(
(callbackObject, stateObject) => msmqQueue.BeginReceive(TimeSpan.FromMilliseconds(10), stateObject, callbackObject),
asyncResultActionObject => msmqQueue.EndReceive(asyncResultActionObject)
)()
.Select(queueMessage =>
{
return queueMessage.Body;
});
return anonyFunction.Catch<object, Exception>(exception =>
{
return Observable.Empty<object>();
})
.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
})
.Catch<object, MessageQueueException>(exception => { return Observable.Empty<object>(); })
.Catch<object, Exception>(exceptionGeneric => { return Observable.Empty<object>(); });
}
static void QueueMessageRecievedHandler(object queueMessage)
{
Console.WriteLine("Message Received from Queue. Congrats !!");
}
static void QueueMessageReceivingExceptionOccurredHandler(Exception exceptionObject)
{
Console.WriteLine("Oops. Rx is not happy to play with MSMQ, please help Bartde !!");
}
}
}
@mross002
Copy link

Nice inplementation, especially around the error handling. Did you ever get this resolved? I think you just need an onCompletion to BeginReceive again so the Rx "listens" again. You may also want to consider BeginPeek instead, especially if using a transactional queue. I'm working on creating a similar project using Rx and MSMQ, if you've completed this code I would love to see an example of the finished product. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment