Skip to content

Instantly share code, notes, and snippets.

@herecydev
Created September 24, 2015 08:47
Show Gist options
  • Save herecydev/9c9ffe601299def7641c to your computer and use it in GitHub Desktop.
Save herecydev/9c9ffe601299def7641c to your computer and use it in GitHub Desktop.
using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace QueueClientRepro
{
class Program
{
static void Main(string[] args)
{
var client = new Client();
var batches = new List<List<BrokeredMessage>>();
var sendCount = 0;
for (int i = 0; i < 100; i++)
{
var messages = new List<BrokeredMessage>();
for (int j = 0; j < 100; j++)
{
sendCount++;
var poco = new POCO { Id = i };
messages.Add(new BrokeredMessage(poco));
}
batches.Add(messages);
}
Console.WriteLine("Sent {0} messages", sendCount);
var receiveCount = 0;
var handler = new Handler((bm) =>
{
Interlocked.Increment(ref receiveCount);
if (bm.DeliveryCount > 1)
Console.WriteLine("Message with Id {0} has a delivery count {1}", bm.MessageId, bm.DeliveryCount);
});
foreach (var batch in batches)
{
client.Send(batch).Wait();
}
Task.Delay(2000).Wait();
Console.WriteLine("Received {0} messages", receiveCount);
handler.Close();
}
}
public class POCO
{
public int Id { get; set; }
}
internal class Handler
{
private readonly ConcurrentDictionary<int, Task> _pendingTasks = new ConcurrentDictionary<int, Task>();
private readonly Action<BrokeredMessage> _callback;
public Handler(Action<BrokeredMessage> callback)
{
var client = new Client();
_callback = callback;
client.OnMessage(Handle);
}
private Task Handle(BrokeredMessage message)
{
_callback(message);
var task = message.CompleteAsync();
_pendingTasks.TryAdd(task.Id, task);
return task;
}
public void Close()
{
Task.WaitAll(_pendingTasks.Values.ToArray());
}
}
internal class Client
{
private const int MaxConcurrentCalls = 200;
private const int PrefetchCount = 100;
private readonly QueueClient _queueClient;
public Client()
{
var manager = Microsoft.ServiceBus.NamespaceManager.CreateFromConnectionString("someconnectionstring");
if (!manager.QueueExists("test"))
{
var desc = new QueueDescription("test");
desc.LockDuration = TimeSpan.FromMinutes(5);
desc.DefaultMessageTimeToLive = TimeSpan.FromMinutes(5);
manager.CreateQueue(desc);
}
_queueClient = QueueClient.CreateFromConnectionString("someconnectionstring", "test");
_queueClient.PrefetchCount = PrefetchCount;
}
public Task Send(IEnumerable<BrokeredMessage> brokeredMessages)
{
return _queueClient.SendBatchAsync(brokeredMessages);
}
public void OnMessage(Func<BrokeredMessage, Task> callBack)
{
_queueClient.OnMessageAsync(callBack, new OnMessageOptions { MaxConcurrentCalls = MaxConcurrentCalls });
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment