Created
January 22, 2019 00:46
-
-
Save PureKrome/b1dcb248a9c734151607b7d964608deb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace Hornet.Core.Storage.EventBus | |
| { | |
| public interface IEventBus | |
| { | |
| string Name { get; } | |
| Task PublishAsync<T>(T @event) where T : class, IEvent; | |
| void Subscribe<T>(string subscription, Func<T, CancellationToken, Task> onMessage) where T : class, IEvent; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.Logging; | |
| using Newtonsoft.Json; | |
| using Polly; | |
| using RabbitMQ.Client; | |
| using RabbitMQ.Client.Events; | |
| using System; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace Hornet.Core.Storage.EventBus | |
| { | |
| public class RabbitMQEventBus : IDisposable, IEventBus | |
| { | |
| private readonly Lazy<ConnectionFactory> _connectionFactory; | |
| private readonly Lazy<IConnection> _connection; | |
| private readonly Lazy<IModel> _channel; | |
| private const string ExchangeName = "hornet-test"; | |
| private readonly ILogger<RabbitMQEventBus> _logger; | |
| private static Policy CheckRabbitMQPolicy(ILogger logger) | |
| { | |
| return Policy | |
| .Handle<Exception>() | |
| .WaitAndRetry(100, _ => TimeSpan.FromSeconds(2), (exception, timeSpan, __) => logger.LogWarning($"Failed to connect to RabbitMQ. It may not be ready. Retrying ... time to wait: {timeSpan}. Exception: {exception.GetType()} {exception.Message}") ); | |
| } | |
| public string Name => "RabbitMQ Event Bus"; | |
| public RabbitMQEventBus(string connectionString, | |
| ILogger<RabbitMQEventBus> logger) | |
| { | |
| _connectionFactory = new Lazy<ConnectionFactory>(() => | |
| { | |
| return CheckRabbitMQPolicy(logger).Execute(() => | |
| { | |
| var factory = new ConnectionFactory | |
| { | |
| HostName = connectionString | |
| }; | |
| // Ref: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery | |
| factory.AutomaticRecoveryEnabled = true; | |
| return factory; | |
| }); | |
| }); | |
| _connection = new Lazy<IConnection>(() => CheckRabbitMQPolicy(logger).Execute(() => | |
| _connectionFactory.Value.CreateConnection()) | |
| ); | |
| _channel = new Lazy<IModel>(() => | |
| { | |
| return CheckRabbitMQPolicy(logger).Execute(() => | |
| { | |
| var channel = _connection.Value.CreateModel(); | |
| channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", true); | |
| return channel; | |
| }); | |
| }); | |
| _logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |
| } | |
| public Task PublishAsync<T>(T @event) where T : class, IEvent | |
| { | |
| if (@event == null) | |
| { | |
| throw new ArgumentNullException(nameof(@event)); | |
| } | |
| try | |
| { | |
| var topic = typeof(T).Name; | |
| var json = JsonConvert.SerializeObject(@event); | |
| var body = Encoding.UTF8.GetBytes(json); | |
| _logger.LogDebug("Publishing event {event} to topic: '{topic}'", @event, topic); | |
| var channel = _channel.Value; | |
| var properties = channel.CreateBasicProperties(); | |
| properties.DeliveryMode = 2; // Persistent. | |
| channel.BasicPublish(ExchangeName, topic, true, properties, body); | |
| } | |
| catch(Exception exception) | |
| { | |
| _logger.LogError(exception, "Failed to publish an event: {event}.", @event); | |
| return Task.FromException(exception); | |
| } | |
| return Task.CompletedTask; | |
| } | |
| public void Subscribe<T>(string subscription, | |
| Func<T, CancellationToken, Task> onMessage) where T : class, IEvent | |
| { | |
| var topic = typeof(T).Name; | |
| var channel = _channel.Value; | |
| var queueName = channel.QueueDeclare(subscription, true, false, false); | |
| channel.QueueBind(queueName, ExchangeName, topic); | |
| var consumer = new EventingBasicConsumer(channel); | |
| consumer.Received += (model, ea) => | |
| { | |
| var message = Encoding.UTF8.GetString(ea.Body); | |
| var item = JsonConvert.DeserializeObject<T>(message); | |
| try | |
| { | |
| var task = Task.Run(() => onMessage(item, default)); | |
| Task.WaitAll(task); | |
| } | |
| catch (Exception exception) | |
| { | |
| _logger.LogError($"EXCEPTION: Failed to handle event [{topic}] for queue [{queueName.QueueName}]. Error: {exception.Message}"); | |
| throw; | |
| } | |
| }; | |
| _logger.LogDebug("Subscribing to Queue [{subscription}] for any published Topic [{topic}].", subscription, topic); | |
| channel.BasicConsume(queueName, true, consumer); | |
| } | |
| public void Dispose() | |
| { | |
| if (_connection?.IsValueCreated == true) | |
| { | |
| _connection.Value.Close(); | |
| } | |
| if (_channel?.IsValueCreated == true) | |
| { | |
| _channel.Value.Close(); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment