Created
September 24, 2015 08:47
-
-
Save herecydev/9c9ffe601299def7641c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.ServiceBus.Messaging; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace QueueClientRepro | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var client = new Client(); | |
var batches = new List<List<BrokeredMessage>>(); | |
var sendCount = 0; | |
for (int i = 0; i < 100; i++) | |
{ | |
var messages = new List<BrokeredMessage>(); | |
for (int j = 0; j < 100; j++) | |
{ | |
sendCount++; | |
var poco = new POCO { Id = i }; | |
messages.Add(new BrokeredMessage(poco)); | |
} | |
batches.Add(messages); | |
} | |
Console.WriteLine("Sent {0} messages", sendCount); | |
var receiveCount = 0; | |
var handler = new Handler((bm) => | |
{ | |
Interlocked.Increment(ref receiveCount); | |
if (bm.DeliveryCount > 1) | |
Console.WriteLine("Message with Id {0} has a delivery count {1}", bm.MessageId, bm.DeliveryCount); | |
}); | |
foreach (var batch in batches) | |
{ | |
client.Send(batch).Wait(); | |
} | |
Task.Delay(2000).Wait(); | |
Console.WriteLine("Received {0} messages", receiveCount); | |
handler.Close(); | |
} | |
} | |
public class POCO | |
{ | |
public int Id { get; set; } | |
} | |
internal class Handler | |
{ | |
private readonly ConcurrentDictionary<int, Task> _pendingTasks = new ConcurrentDictionary<int, Task>(); | |
private readonly Action<BrokeredMessage> _callback; | |
public Handler(Action<BrokeredMessage> callback) | |
{ | |
var client = new Client(); | |
_callback = callback; | |
client.OnMessage(Handle); | |
} | |
private Task Handle(BrokeredMessage message) | |
{ | |
_callback(message); | |
var task = message.CompleteAsync(); | |
_pendingTasks.TryAdd(task.Id, task); | |
return task; | |
} | |
public void Close() | |
{ | |
Task.WaitAll(_pendingTasks.Values.ToArray()); | |
} | |
} | |
internal class Client | |
{ | |
private const int MaxConcurrentCalls = 200; | |
private const int PrefetchCount = 100; | |
private readonly QueueClient _queueClient; | |
public Client() | |
{ | |
var manager = Microsoft.ServiceBus.NamespaceManager.CreateFromConnectionString("someconnectionstring"); | |
if (!manager.QueueExists("test")) | |
{ | |
var desc = new QueueDescription("test"); | |
desc.LockDuration = TimeSpan.FromMinutes(5); | |
desc.DefaultMessageTimeToLive = TimeSpan.FromMinutes(5); | |
manager.CreateQueue(desc); | |
} | |
_queueClient = QueueClient.CreateFromConnectionString("someconnectionstring", "test"); | |
_queueClient.PrefetchCount = PrefetchCount; | |
} | |
public Task Send(IEnumerable<BrokeredMessage> brokeredMessages) | |
{ | |
return _queueClient.SendBatchAsync(brokeredMessages); | |
} | |
public void OnMessage(Func<BrokeredMessage, Task> callBack) | |
{ | |
_queueClient.OnMessageAsync(callBack, new OnMessageOptions { MaxConcurrentCalls = MaxConcurrentCalls }); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment