Skip to content

Instantly share code, notes, and snippets.

@andreasohlund
Last active August 29, 2015 14:27
Show Gist options
  • Select an option

  • Save andreasohlund/c9c0d51ff3ce0190bbbf to your computer and use it in GitHub Desktop.

Select an option

Save andreasohlund/c9c0d51ff3ce0190bbbf to your computer and use it in GitHub Desktop.
namespace MsmqAsyncSpike
{
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var queue = new MessageQueue(@".\private$\asyncspike");
StartMessagPump(queue)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
}
static async Task StartMessagPump(MessageQueue queue)
{
while (true)
{
await Task.Factory.FromAsync<Message>(queue.BeginPeek(), queue.EndPeek);
Task.Factory.StartNew(() => ProcessMessage(queue))
//todo: add our custom scheduler here so we can control: 1. Threads, 2. Throughput (if requested by user)
//todo setup onfaulted?
.ConfigureAwait(false);
}
}
static async Task ProcessMessage(MessageQueue queue)
{
Func<Message, Task> pipe = m =>
{
Thread.Sleep(10000);
Console.WriteLine(m.Id);
return Task.FromResult(0);
};
using (var tx = new MessageQueueTransaction()) //use a native TX in this sample
{
tx.Begin();
//there is no async version of receive http://stackoverflow.com/questions/16100773/messagequeue-and-async-await
var message = queue.Receive(tx);
await pipe(message);
tx.Commit();
}
}
}
}
@andreasohlund
Copy link
Copy Markdown
Author

Thanks @distantcam will look into that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment