Skip to content

Instantly share code, notes, and snippets.

@PureKrome
Created January 22, 2019 00:46
Show Gist options
  • Select an option

  • Save PureKrome/b1dcb248a9c734151607b7d964608deb to your computer and use it in GitHub Desktop.

Select an option

Save PureKrome/b1dcb248a9c734151607b7d964608deb to your computer and use it in GitHub Desktop.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Hornet.Core.Storage.EventBus
{
public interface IEventBus
{
string Name { get; }
Task PublishAsync<T>(T @event) where T : class, IEvent;
void Subscribe<T>(string subscription, Func<T, CancellationToken, Task> onMessage) where T : class, IEvent;
}
}
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Hornet.Core.Storage.EventBus
{
public class RabbitMQEventBus : IDisposable, IEventBus
{
private readonly Lazy<ConnectionFactory> _connectionFactory;
private readonly Lazy<IConnection> _connection;
private readonly Lazy<IModel> _channel;
private const string ExchangeName = "hornet-test";
private readonly ILogger<RabbitMQEventBus> _logger;
private static Policy CheckRabbitMQPolicy(ILogger logger)
{
return Policy
.Handle<Exception>()
.WaitAndRetry(100, _ => TimeSpan.FromSeconds(2), (exception, timeSpan, __) => logger.LogWarning($"Failed to connect to RabbitMQ. It may not be ready. Retrying ... time to wait: {timeSpan}. Exception: {exception.GetType()} {exception.Message}") );
}
public string Name => "RabbitMQ Event Bus";
public RabbitMQEventBus(string connectionString,
ILogger<RabbitMQEventBus> logger)
{
_connectionFactory = new Lazy<ConnectionFactory>(() =>
{
return CheckRabbitMQPolicy(logger).Execute(() =>
{
var factory = new ConnectionFactory
{
HostName = connectionString
};
// Ref: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery
factory.AutomaticRecoveryEnabled = true;
return factory;
});
});
_connection = new Lazy<IConnection>(() => CheckRabbitMQPolicy(logger).Execute(() =>
_connectionFactory.Value.CreateConnection())
);
_channel = new Lazy<IModel>(() =>
{
return CheckRabbitMQPolicy(logger).Execute(() =>
{
var channel = _connection.Value.CreateModel();
channel.ExchangeDeclare(exchange: ExchangeName, type: "direct", true);
return channel;
});
});
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task PublishAsync<T>(T @event) where T : class, IEvent
{
if (@event == null)
{
throw new ArgumentNullException(nameof(@event));
}
try
{
var topic = typeof(T).Name;
var json = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(json);
_logger.LogDebug("Publishing event {event} to topic: '{topic}'", @event, topic);
var channel = _channel.Value;
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // Persistent.
channel.BasicPublish(ExchangeName, topic, true, properties, body);
}
catch(Exception exception)
{
_logger.LogError(exception, "Failed to publish an event: {event}.", @event);
return Task.FromException(exception);
}
return Task.CompletedTask;
}
public void Subscribe<T>(string subscription,
Func<T, CancellationToken, Task> onMessage) where T : class, IEvent
{
var topic = typeof(T).Name;
var channel = _channel.Value;
var queueName = channel.QueueDeclare(subscription, true, false, false);
channel.QueueBind(queueName, ExchangeName, topic);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
var item = JsonConvert.DeserializeObject<T>(message);
try
{
var task = Task.Run(() => onMessage(item, default));
Task.WaitAll(task);
}
catch (Exception exception)
{
_logger.LogError($"EXCEPTION: Failed to handle event [{topic}] for queue [{queueName.QueueName}]. Error: {exception.Message}");
throw;
}
};
_logger.LogDebug("Subscribing to Queue [{subscription}] for any published Topic [{topic}].", subscription, topic);
channel.BasicConsume(queueName, true, consumer);
}
public void Dispose()
{
if (_connection?.IsValueCreated == true)
{
_connection.Value.Close();
}
if (_channel?.IsValueCreated == true)
{
_channel.Value.Close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment