Skip to content

Instantly share code, notes, and snippets.

@tugberkugurlu
Created February 19, 2017 14:01
Show Gist options
  • Save tugberkugurlu/7d31977648cb4e1e56e17d928d6430b0 to your computer and use it in GitHub Desktop.
Save tugberkugurlu/7d31977648cb4e1e56e17d928d6430b0 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Foo.Messaging.Abstractions;
namespace Foo.Messaging.RabbitMQ.Consumer
{
public interface ISubscriptionCreator
{
ISubscription<TMessage> CreateSubscription<TMessage>(string queueName);
}
public interface ISubscription<TMessage> : IDisposable
{
IEnumerable<IMessage<TMessage>> Messages { get; }
}
internal class Subscription<TMessage> : ISubscription<TMessage>
{
private readonly IDisposable _disposable;
public Subscription(IEnumerable<IMessage<TMessage>> messages, IDisposable disposable)
{
Messages = messages;
_disposable = disposable;
}
public IEnumerable<IMessage<TMessage>> Messages { get; }
public void Dispose() => _disposable?.Dispose();
}
internal class RabbitMqQueueSubscriptionCreator : ISubscriptionCreator
{
private readonly IConnection _rabbitConnection;
private readonly ILogger<RabbitMqQueueSubscriptionCreator> _logger;
private readonly int _concurrencyLevel;
public RabbitMqQueueSubscriptionCreator(IConnection rabbitConnection, ILogger<RabbitMqQueueSubscriptionCreator> logger)
: this(rabbitConnection, logger, Environment.ProcessorCount * 2)
{
}
public RabbitMqQueueSubscriptionCreator(IConnection rabbitConnection, ILogger<RabbitMqQueueSubscriptionCreator> logger, int concurrencyLevel)
{
if (rabbitConnection == null) throw new ArgumentNullException(nameof(rabbitConnection));
if (logger == null) throw new ArgumentNullException(nameof(logger));
if(concurrencyLevel < 1) throw new ArgumentOutOfRangeException(nameof(concurrencyLevel), "Cannot be less than '1'");
_rabbitConnection = rabbitConnection;
_logger = logger;
_concurrencyLevel = concurrencyLevel;
}
public ISubscription<TMessage> CreateSubscription<TMessage>(string queueName)
{
if (queueName == null) throw new ArgumentNullException(nameof(queueName));
_logger.LogInformation("Creating a subscription for queue {queueName} with concurrency level of {concurrencyLevel}",
queueName, _concurrencyLevel);
return CreateSubscriptionImpl<TMessage>(queueName);
}
private ISubscription<TMessage> CreateSubscriptionImpl<TMessage>(string queueName)
{
var messagesPipeline = new BlockingCollection<IMessage<TMessage>>();
var cancellationTokens = new List<CancellationToken>();
var compositeDisposable = new CompositeDisposable();
var consumerCts = new CancellationTokenSource();
compositeDisposable.Add(new DisposableCancellation(consumerCts));
consumerCts.Token.Register(() =>
{
if(!messagesPipeline.IsAddingCompleted) messagesPipeline.CompleteAdding();
});
var shutdownCts = new CancellationTokenSource();
shutdownCts.Token.Register(() =>
{
if(!messagesPipeline.IsAddingCompleted) messagesPipeline.CompleteAdding();
});
for (var i = 0; i < _concurrencyLevel; i++)
{
var consumerInfo = CreateSingleConsumer(queueName, messagesPipeline, consumerCts.Token);
compositeDisposable.Add(consumerInfo.Disposable);
cancellationTokens.Add(consumerInfo.CancellationToken);
}
WatchShutdownAsync<TMessage>(cancellationTokens, shutdownCts, consumerCts.Token);
compositeDisposable.Add(messagesPipeline);
return new Subscription<TMessage>(messagesPipeline.GetConsumingEnumerable(), compositeDisposable);
}
private ConsumerInfo CreateSingleConsumer<TMessage>(string queueName, BlockingCollection<IMessage<TMessage>> messagesPipeline, CancellationToken cancellationToken)
{
var cts = new CancellationTokenSource();
var channel = _rabbitConnection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
// Per consumer limit. See http://www.rabbitmq.com/consumer-prefetch.html
// and http://stackoverflow.com/a/8179850/463785
// Also see this for fetching more than one: http://stackoverflow.com/a/32592077/463785
channel.BasicQos(0, 1, false);
consumer.Received += (_, ea) =>
{
_logger.LogDebug("Recieved {messageId}", ea.BasicProperties.MessageId);
try
{
var rabbitMessage = new RabbitMessage<TMessage>(channel, ea);
messagesPipeline.Add(rabbitMessage, cancellationToken);
_logger.LogDebug("Dispatched {messageId} for handling", rabbitMessage.Id);
}
catch (Exception ex)
{
_logger.LogError(0, ex, "Error while adding the message to the pipeline");
channel.BasicNack(ea.DeliveryTag, false, true);
}
};
consumer.Shutdown += (_, ea) =>
{
_logger.LogWarning("Consumer has been shut down because of {cause} with {replyCode} for {replyText}",
ea.Cause,
ea.ReplyCode,
ea.ReplyText);
cts.Cancel();
cts.Dispose();
};
channel.BasicConsume(queueName, noAck: false, consumer: consumer);
return new ConsumerInfo(new DisposableChannel(channel), cts.Token);
}
/// <summary>
/// This is watching fow all provided cancellation tokens and when all of them are cancelled, it triggers
/// cts.Cancel() and disposes the cts.
/// </summary>
private static Task WatchShutdownAsync<TMessage>(IEnumerable<CancellationToken> cancellationTokens,
CancellationTokenSource cts, CancellationToken cancellationToken)
{
return Task.Run(async () =>
{
var cancellationWatchers = cancellationTokens.Select(async token =>
{
if (token.IsCancellationRequested)
{
return;
}
try
{
await Task.Delay(Timeout.Infinite, token);
}
catch (TaskCanceledException)
{
return;
}
throw new InvalidOperationException("TODO: it should not have hit here");
});
await Task.WhenAll(cancellationWatchers);
cts.Cancel();
cts.Dispose();
}, cancellationToken);
}
private class DisposableCancellation : IDisposable
{
private readonly CancellationTokenSource _cts;
public DisposableCancellation(CancellationTokenSource cts)
{
_cts = cts;
}
public void Dispose()
{
_cts.Cancel();
_cts.Dispose();
}
}
private class DisposableChannel : IDisposable
{
private readonly IModel _channel;
public DisposableChannel(IModel channel)
{
_channel = channel;
}
public void Dispose()
{
_channel.Close();
_channel.Dispose();
}
}
private class ConsumerInfo
{
public ConsumerInfo(IDisposable disposable, CancellationToken cancellationToken)
{
Disposable = disposable;
CancellationToken = cancellationToken;
}
public IDisposable Disposable { get; }
public CancellationToken CancellationToken { get; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment