Skip to content

Instantly share code, notes, and snippets.

@dancrowley303
Created May 24, 2018 08:25
Show Gist options
  • Save dancrowley303/e8ad4445c5b969e71f4568161540d0d5 to your computer and use it in GitHub Desktop.
Save dancrowley303/e8ad4445c5b969e71f4568161540d0d5 to your computer and use it in GitHub Desktop.
azure-servicebus-various-client-handling-strategies
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace bussub
{
class Program
{
private static ConcurrentStack<long> tokens = new ConcurrentStack<long>();
private const string ServiceBusConnectionString = "<connstringhere>";
private const string TopicName = "<topicname>";
private const string SubscriptionName = "<subname>";
private static ISubscriptionClient _subscriptionClient;
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync()
{
_subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
Console.WriteLine("=======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("=======================================================");
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await _subscriptionClient.CloseAsync();
}
private static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
if (tokens.Contains(message.SystemProperties.SequenceNumber))
{
Console.WriteLine($"Already handled {message.SystemProperties.SequenceNumber}; abandoning");
await _subscriptionClient.AbandonAsync(message.SystemProperties.LockToken);
}
else
{
tokens.Push(message.SystemProperties.SequenceNumber);
if (message.SystemProperties.SequenceNumber % 2 == 0)
{
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
else
{
if (message.SystemProperties.SequenceNumber % 7 == 0)
{
Console.WriteLine($"Sending {message.SystemProperties.SequenceNumber} straight to DLQ");
await _subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken, "7 goes to heaven");
}
else if (message.SystemProperties.SequenceNumber % 3 == 0)
{
Console.WriteLine($"Ignoring {message.SystemProperties.SequenceNumber}");
//if you don't abandon the message, it won't be rebroadcast until after the lock time expiry (default 30 s)
await Task.CompletedTask;
}
else
{
Console.WriteLine($"Abandoning {message.SystemProperties.SequenceNumber}");
await _subscriptionClient.AbandonAsync(message.SystemProperties.LockToken);
}
}
}
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment