Last active
August 29, 2015 14:06
-
-
Save ekepes/6940e5f70cbac25203ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Automatonymous; | |
using Dapper; | |
using MassTransit.Exceptions; | |
using MassTransit.Logging; | |
using MassTransit.Pipeline; | |
using MassTransit.Saga; | |
using MassTransit.Util; | |
using Newtonsoft.Json; | |
using System; | |
using System.Collections.Generic; | |
using System.Data.SqlClient; | |
using System.Linq; | |
using System.Transactions; | |
namespace MassTransit.DapperIntegration | |
{ | |
/// <summary> | |
/// A saga repository implementation using Dapper and JSON (de)serialization. | |
/// All sagas are stored in a single table, e.g.: | |
/// <code> | |
/// CREATE TABLE Sagas | |
/// ( | |
/// correlation_id UNIQUEIDENTIFIER NOT NULL PRIMARY KEY NONCLUSTERED, | |
/// saga_instance_type_name NVARCHAR(200) NOT NULL, | |
/// saga_instance_json NVARCHAR(MAX) NOT NULL, | |
/// ) | |
/// </code> | |
/// The state machine instance is serialized and stored in the saga_instance_json | |
/// column when saving the saga, and deserialized when loading the saga. | |
/// </summary> | |
/// <typeparam name="TSaga"></typeparam> | |
public class DapperSagaRepository<TSaga> : ISagaRepository<TSaga> | |
where TSaga : class, ISaga | |
{ | |
private readonly ILog _log = Logger.Get<DapperSagaRepository<TSaga>>(); | |
private readonly string _connectionString; | |
private readonly JsonSerializerSettings _jsonSettings; | |
private readonly string _sqlTableName; | |
public DapperSagaRepository(string connectionString, string sqlTableName, StateMachine<TSaga> stateMachine) | |
{ | |
_connectionString = connectionString; | |
_sqlTableName = sqlTableName; | |
// Configure JSON serialization. | |
_jsonSettings = new JsonSerializerSettings(); | |
_jsonSettings.Converters.Add(new SagaStateJsonConverter<TSaga>(stateMachine)); | |
_jsonSettings.Converters.Add(new SagaServiceBusConverter()); | |
} | |
/// <summary> | |
/// Loads/Creates the saga and makes it available for later consumption through the Actions | |
/// </summary> | |
public IEnumerable<Action<IConsumeContext<TMessage>>> GetSaga<TMessage>( | |
IConsumeContext<TMessage> context, | |
Guid sagaId, | |
InstanceHandlerSelector<TSaga, TMessage> selector, | |
ISagaPolicy<TSaga, TMessage> policy) | |
where TMessage : class | |
{ | |
using (var txScope = new TransactionScope(TransactionScopeOption.Required, | |
new TransactionOptions { IsolationLevel = IsolationLevel.Serializable })) | |
{ | |
var instance = GetSaga(sagaId); | |
if (instance == null) | |
{ | |
if (policy.CanCreateInstance(context)) | |
{ | |
yield return x => | |
{ | |
_log.DebugFormat("SAGA: {0} Creating New {1} for {2}", typeof(TSaga).ToFriendlyName(), | |
sagaId, typeof(TMessage).ToFriendlyName()); | |
try | |
{ | |
instance = policy.CreateInstance(x, sagaId); | |
foreach (var callback in selector(instance, x)) | |
callback(x); | |
if (!policy.CanRemoveInstance(instance)) | |
SaveSaga(instance); | |
} | |
catch (Exception ex) | |
{ | |
var sex = new SagaException("Create Saga Instance Exception", typeof(TSaga), | |
typeof(TMessage), sagaId, ex); | |
if (_log.IsErrorEnabled) | |
_log.Error("Saga exception!", sex); | |
throw sex; | |
} | |
}; | |
} | |
else | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Ignoring Missing {1} for {2}", typeof(TSaga).ToFriendlyName(), | |
sagaId, typeof(TMessage).ToFriendlyName()); | |
} | |
} | |
else | |
{ | |
if (policy.CanUseExistingInstance(context)) | |
{ | |
yield return x => | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Using Existing {1} for {2}", | |
typeof(TSaga).ToFriendlyName(), sagaId, | |
typeof(TMessage).ToFriendlyName()); | |
try | |
{ | |
foreach (var callback in selector(instance, x)) | |
callback(x); | |
if (policy.CanRemoveInstance(instance)) | |
DeleteSaga(instance); | |
else // TODO: Is this the correct place to persist the saga again? | |
SaveSaga(instance); | |
} | |
catch (Exception ex) | |
{ | |
var sex = new SagaException("Existing Saga Instance Exception", typeof(TSaga), | |
typeof(TMessage), sagaId, ex); | |
if (_log.IsErrorEnabled) | |
_log.Error(sex); | |
throw sex; | |
} | |
}; | |
} | |
else | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Ignoring Existing {1} for {2}", typeof(TSaga).ToFriendlyName(), | |
sagaId, | |
typeof(TMessage).ToFriendlyName()); | |
} | |
} | |
txScope.Complete(); | |
} | |
} | |
private TSaga GetSaga(Guid sagaCorrelationId) | |
{ | |
var sql = string.Format("SELECT correlation_id, saga_instance_json FROM {0} WHERE correlation_id = @correlation_id", _sqlTableName); | |
using (var connection = new SqlConnection(_connectionString)) | |
{ | |
connection.Open(); | |
var result = connection.Query<dynamic>(sql, new { correlation_id = sagaCorrelationId }).SingleOrDefault(); | |
if (result == null) | |
return null; | |
return JsonConvert.DeserializeObject<TSaga>(result.saga_instance_json, _jsonSettings); | |
} | |
} | |
/// <summary> | |
/// Saves or updates a saga. | |
/// </summary> | |
/// <param name="saga"></param> | |
private void SaveSaga(TSaga saga) | |
{ | |
var sql = string.Format(@" | |
MERGE INTO {0} src | |
USING ( | |
VALUES (@correlation_id, @saga_instance_type_name, @saga_instance_json) | |
) tgt (correlation_id, saga_instance_type_name, saga_instance_json) | |
ON src.correlation_id = tgt.correlation_id | |
WHEN MATCHED THEN | |
UPDATE SET saga_instance_json = tgt.saga_instance_json | |
WHEN NOT MATCHED THEN | |
INSERT (correlation_id, saga_instance_type_name, saga_instance_json) | |
VALUES (tgt.correlation_id, tgt.saga_instance_type_name, tgt.saga_instance_json);", _sqlTableName); | |
var json = JsonConvert.SerializeObject(saga, _jsonSettings); | |
using (var connection = new SqlConnection(_connectionString)) | |
{ | |
connection.Open(); | |
connection.Execute(sql, new | |
{ | |
correlation_id = saga.CorrelationId, | |
saga_instance_type_name = saga.GetType().FullName, | |
saga_instance_json = json | |
}); | |
} | |
} | |
private void DeleteSaga(TSaga saga) | |
{ | |
var sql = string.Format("DELETE FROM {0} WHERE correlation_id = @CorrelationId", _sqlTableName); | |
using (var connection = new SqlConnection(_connectionString)) | |
{ | |
connection.Open(); | |
connection.Execute(sql, new { saga.CorrelationId }); | |
} | |
} | |
/// <summary> | |
/// Finds the CorrelationIds for the sagas that match the filter | |
/// </summary> | |
/// <param name="filter">effectively a LINQ expression</param> | |
/// <returns/> | |
public IEnumerable<Guid> Find(ISagaFilter<TSaga> filter) | |
{ | |
return Where(filter, x => x.CorrelationId); | |
} | |
/// <summary> | |
/// Finds the sagas that match the filter | |
/// </summary> | |
/// <param name="filter">effectively a LINQ expression</param> | |
/// <returns/> | |
public IEnumerable<TSaga> Where(ISagaFilter<TSaga> filter) | |
{ | |
// TODO: How do we translate the Expression<Func<TSaga, bool>> expression into sql? | |
throw new NotImplementedException(); | |
} | |
/// <summary> | |
/// Finds all of the sagas that match the filter, and then applies a transform on them. | |
/// </summary> | |
/// <typeparam name="TResult"/><param name="filter">effectively a LINQ expression</param><param name="transformer"/> | |
/// <returns/> | |
public IEnumerable<TResult> Where<TResult>(ISagaFilter<TSaga> filter, Func<TSaga, TResult> transformer) | |
{ | |
return Where(filter).Select(transformer); | |
} | |
/// <summary> | |
/// Queries the underlying store for sagas, then applies a transform to them and returns the result | |
/// </summary> | |
/// <typeparam name="TResult"/><param name="transformer"/> | |
/// <returns/> | |
public IEnumerable<TResult> Select<TResult>(Func<TSaga, TResult> transformer) | |
{ | |
var sql = string.Format("SELECT correlation_id, saga_instance_json FROM {0}", _sqlTableName); | |
// TODO: Transaction logic taken from the NHibernateSagaRepository | |
// Why is this needed? From my understanding, the transformations are not persisted. | |
using (var txScope = new TransactionScope(TransactionScopeOption.Required, | |
new TransactionOptions { IsolationLevel = IsolationLevel.Serializable })) | |
using (var connection = new SqlConnection(_connectionString)) | |
{ | |
connection.Open(); | |
var transformedSagas = connection.Query<dynamic>(sql) | |
.Select(o => (TSaga)JsonConvert.DeserializeObject<TSaga>(o.saga_instance_json, _jsonSettings)) | |
.Select(transformer) | |
.ToList(); | |
txScope.Complete(); | |
return transformedSagas; | |
} | |
} | |
} | |
/// <summary> | |
/// (De)Serialization of the State property on ISaga. | |
/// Uses the state machine to get the current state. | |
/// </summary> | |
/// <typeparam name="TSaga"></typeparam> | |
public class SagaStateJsonConverter<TSaga> : JsonConverter | |
where TSaga : class, ISaga | |
{ | |
private readonly StateMachine<TSaga> _stateMachine; | |
public SagaStateJsonConverter(StateMachine<TSaga> stateMachine) | |
{ | |
_stateMachine = stateMachine; | |
} | |
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) | |
{ | |
serializer.Serialize(writer, ((State)value).Name); | |
} | |
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) | |
{ | |
var val = (string)serializer.Deserialize(reader, typeof(string)); | |
return _stateMachine.GetState(val); | |
} | |
public override bool CanConvert(Type objectType) | |
{ | |
return typeof(State).IsAssignableFrom(objectType); | |
} | |
} | |
public class SagaServiceBusConverter : JsonConverter | |
{ | |
// This is my attempt of ignoring the service bus property during | |
// serialization and deserialization, because I can't find any usage of it? | |
// Is this a problem? | |
// If not, is there a better way of completely ignoring the attribute, | |
// besides decorating the class with a JsonIgnore attribute? | |
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) | |
{ | |
serializer.Serialize(writer, null); | |
} | |
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) | |
{ | |
return null; | |
} | |
public override bool CanConvert(Type objectType) | |
{ | |
return typeof(IServiceBus).IsAssignableFrom(objectType); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment