Created
February 19, 2017 14:01
-
-
Save tugberkugurlu/7d31977648cb4e1e56e17d928d6430b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Extensions.Logging; | |
using RabbitMQ.Client; | |
using RabbitMQ.Client.Events; | |
using Foo.Messaging.Abstractions; | |
namespace Foo.Messaging.RabbitMQ.Consumer | |
{ | |
public interface ISubscriptionCreator | |
{ | |
ISubscription<TMessage> CreateSubscription<TMessage>(string queueName); | |
} | |
public interface ISubscription<TMessage> : IDisposable | |
{ | |
IEnumerable<IMessage<TMessage>> Messages { get; } | |
} | |
internal class Subscription<TMessage> : ISubscription<TMessage> | |
{ | |
private readonly IDisposable _disposable; | |
public Subscription(IEnumerable<IMessage<TMessage>> messages, IDisposable disposable) | |
{ | |
Messages = messages; | |
_disposable = disposable; | |
} | |
public IEnumerable<IMessage<TMessage>> Messages { get; } | |
public void Dispose() => _disposable?.Dispose(); | |
} | |
internal class RabbitMqQueueSubscriptionCreator : ISubscriptionCreator | |
{ | |
private readonly IConnection _rabbitConnection; | |
private readonly ILogger<RabbitMqQueueSubscriptionCreator> _logger; | |
private readonly int _concurrencyLevel; | |
public RabbitMqQueueSubscriptionCreator(IConnection rabbitConnection, ILogger<RabbitMqQueueSubscriptionCreator> logger) | |
: this(rabbitConnection, logger, Environment.ProcessorCount * 2) | |
{ | |
} | |
public RabbitMqQueueSubscriptionCreator(IConnection rabbitConnection, ILogger<RabbitMqQueueSubscriptionCreator> logger, int concurrencyLevel) | |
{ | |
if (rabbitConnection == null) throw new ArgumentNullException(nameof(rabbitConnection)); | |
if (logger == null) throw new ArgumentNullException(nameof(logger)); | |
if(concurrencyLevel < 1) throw new ArgumentOutOfRangeException(nameof(concurrencyLevel), "Cannot be less than '1'"); | |
_rabbitConnection = rabbitConnection; | |
_logger = logger; | |
_concurrencyLevel = concurrencyLevel; | |
} | |
public ISubscription<TMessage> CreateSubscription<TMessage>(string queueName) | |
{ | |
if (queueName == null) throw new ArgumentNullException(nameof(queueName)); | |
_logger.LogInformation("Creating a subscription for queue {queueName} with concurrency level of {concurrencyLevel}", | |
queueName, _concurrencyLevel); | |
return CreateSubscriptionImpl<TMessage>(queueName); | |
} | |
private ISubscription<TMessage> CreateSubscriptionImpl<TMessage>(string queueName) | |
{ | |
var messagesPipeline = new BlockingCollection<IMessage<TMessage>>(); | |
var cancellationTokens = new List<CancellationToken>(); | |
var compositeDisposable = new CompositeDisposable(); | |
var consumerCts = new CancellationTokenSource(); | |
compositeDisposable.Add(new DisposableCancellation(consumerCts)); | |
consumerCts.Token.Register(() => | |
{ | |
if(!messagesPipeline.IsAddingCompleted) messagesPipeline.CompleteAdding(); | |
}); | |
var shutdownCts = new CancellationTokenSource(); | |
shutdownCts.Token.Register(() => | |
{ | |
if(!messagesPipeline.IsAddingCompleted) messagesPipeline.CompleteAdding(); | |
}); | |
for (var i = 0; i < _concurrencyLevel; i++) | |
{ | |
var consumerInfo = CreateSingleConsumer(queueName, messagesPipeline, consumerCts.Token); | |
compositeDisposable.Add(consumerInfo.Disposable); | |
cancellationTokens.Add(consumerInfo.CancellationToken); | |
} | |
WatchShutdownAsync<TMessage>(cancellationTokens, shutdownCts, consumerCts.Token); | |
compositeDisposable.Add(messagesPipeline); | |
return new Subscription<TMessage>(messagesPipeline.GetConsumingEnumerable(), compositeDisposable); | |
} | |
private ConsumerInfo CreateSingleConsumer<TMessage>(string queueName, BlockingCollection<IMessage<TMessage>> messagesPipeline, CancellationToken cancellationToken) | |
{ | |
var cts = new CancellationTokenSource(); | |
var channel = _rabbitConnection.CreateModel(); | |
var consumer = new EventingBasicConsumer(channel); | |
// Per consumer limit. See http://www.rabbitmq.com/consumer-prefetch.html | |
// and http://stackoverflow.com/a/8179850/463785 | |
// Also see this for fetching more than one: http://stackoverflow.com/a/32592077/463785 | |
channel.BasicQos(0, 1, false); | |
consumer.Received += (_, ea) => | |
{ | |
_logger.LogDebug("Recieved {messageId}", ea.BasicProperties.MessageId); | |
try | |
{ | |
var rabbitMessage = new RabbitMessage<TMessage>(channel, ea); | |
messagesPipeline.Add(rabbitMessage, cancellationToken); | |
_logger.LogDebug("Dispatched {messageId} for handling", rabbitMessage.Id); | |
} | |
catch (Exception ex) | |
{ | |
_logger.LogError(0, ex, "Error while adding the message to the pipeline"); | |
channel.BasicNack(ea.DeliveryTag, false, true); | |
} | |
}; | |
consumer.Shutdown += (_, ea) => | |
{ | |
_logger.LogWarning("Consumer has been shut down because of {cause} with {replyCode} for {replyText}", | |
ea.Cause, | |
ea.ReplyCode, | |
ea.ReplyText); | |
cts.Cancel(); | |
cts.Dispose(); | |
}; | |
channel.BasicConsume(queueName, noAck: false, consumer: consumer); | |
return new ConsumerInfo(new DisposableChannel(channel), cts.Token); | |
} | |
/// <summary> | |
/// This is watching fow all provided cancellation tokens and when all of them are cancelled, it triggers | |
/// cts.Cancel() and disposes the cts. | |
/// </summary> | |
private static Task WatchShutdownAsync<TMessage>(IEnumerable<CancellationToken> cancellationTokens, | |
CancellationTokenSource cts, CancellationToken cancellationToken) | |
{ | |
return Task.Run(async () => | |
{ | |
var cancellationWatchers = cancellationTokens.Select(async token => | |
{ | |
if (token.IsCancellationRequested) | |
{ | |
return; | |
} | |
try | |
{ | |
await Task.Delay(Timeout.Infinite, token); | |
} | |
catch (TaskCanceledException) | |
{ | |
return; | |
} | |
throw new InvalidOperationException("TODO: it should not have hit here"); | |
}); | |
await Task.WhenAll(cancellationWatchers); | |
cts.Cancel(); | |
cts.Dispose(); | |
}, cancellationToken); | |
} | |
private class DisposableCancellation : IDisposable | |
{ | |
private readonly CancellationTokenSource _cts; | |
public DisposableCancellation(CancellationTokenSource cts) | |
{ | |
_cts = cts; | |
} | |
public void Dispose() | |
{ | |
_cts.Cancel(); | |
_cts.Dispose(); | |
} | |
} | |
private class DisposableChannel : IDisposable | |
{ | |
private readonly IModel _channel; | |
public DisposableChannel(IModel channel) | |
{ | |
_channel = channel; | |
} | |
public void Dispose() | |
{ | |
_channel.Close(); | |
_channel.Dispose(); | |
} | |
} | |
private class ConsumerInfo | |
{ | |
public ConsumerInfo(IDisposable disposable, CancellationToken cancellationToken) | |
{ | |
Disposable = disposable; | |
CancellationToken = cancellationToken; | |
} | |
public IDisposable Disposable { get; } | |
public CancellationToken CancellationToken { get; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment