Last active
October 6, 2015 20:21
-
-
Save fjeldstad/8c9d002a0e1eb545261b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class Saga<TState> where TState : class | |
{ | |
private const int MaxMessageHandlingRetryAttemptsPerMessage = 3; | |
private const int MillisecondsBetweenMessageHandlingAttempts = 500; | |
private readonly List<Func<IBus, string, Task>> _registrations = new List<Func<IBus, string, Task>>(); | |
protected void When<TMessage>( | |
Func<TMessage, Task<IMemory>> accessMemory, | |
Func<TMessage, TState, Task<ITransformResult>> transform) where TMessage : class | |
{ | |
if (accessMemory == null) | |
{ | |
throw new ArgumentNullException(nameof(accessMemory)); | |
} | |
if (transform == null) | |
{ | |
throw new ArgumentNullException(nameof(transform)); | |
} | |
_registrations.Add((bus, subscriberId) => bus.RegisterSubscriber(subscriberId, async (TMessage msg) => | |
{ | |
var attempts = 0; | |
var retry = false; | |
do | |
{ | |
attempts++; | |
try | |
{ | |
var memory = await accessMemory(msg).ConfigureAwait(false); | |
if (memory == null) | |
{ | |
throw new InvalidOperationException( | |
$"Unable to access the memory of {GetType().Name} for {typeof(TMessage).Name} message."); | |
} | |
var result = await transform(msg, memory.State).ConfigureAwait(false); | |
await memory.Overwrite(result.NextState).ConfigureAwait(false); | |
foreach (var outputMessage in (result.Messages ?? new object[0]).Where(x => x != null)) | |
{ | |
await bus.Publish(outputMessage).ConfigureAwait(false); | |
} | |
break; | |
} | |
catch (SagaMessageHandlingException ex) | |
{ | |
retry = attempts < MaxMessageHandlingRetryAttemptsPerMessage && ex.Retry; | |
if (retry && MillisecondsBetweenMessageHandlingAttempts > 0) | |
{ | |
await Task.Delay(MillisecondsBetweenMessageHandlingAttempts).ConfigureAwait(false); | |
} | |
} | |
} while (retry); | |
})); | |
} | |
public async Task Connect(IBus bus, string subscriberId) | |
{ | |
if (bus == null) | |
{ | |
throw new ArgumentNullException(nameof(bus)); | |
} | |
if (string.IsNullOrWhiteSpace(subscriberId)) | |
{ | |
throw new ArgumentNullException(nameof(subscriberId)); | |
} | |
foreach (var registration in _registrations) | |
{ | |
await registration(bus, subscriberId.Trim()).ConfigureAwait(false); | |
} | |
} | |
public interface IMemory | |
{ | |
TState State { get; } | |
Func<TState, Task> Overwrite { get; } | |
} | |
public class Memory : IMemory | |
{ | |
public TState State { get; } | |
public Func<TState, Task> Overwrite { get; } | |
public Memory(TState state, Func<TState, Task> overwrite) | |
{ | |
State = state; | |
Overwrite = overwrite; | |
} | |
} | |
public interface ITransformResult | |
{ | |
TState NextState { get; } | |
object[] Messages { get; } | |
} | |
public class TransformResult : ITransformResult | |
{ | |
public TState NextState { get; } | |
public object[] Messages { get; } | |
public TransformResult(TState nextState, params object[] messages) | |
{ | |
NextState = nextState; | |
Messages = messages ?? new object[0]; | |
} | |
} | |
} | |
public class SagaMessageHandlingException : Exception | |
{ | |
public bool Retry { get; } | |
public SagaMessageHandlingException(string message, bool retry, Exception innerException) | |
: base(message, innerException) | |
{ | |
Retry = retry; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment