Created
August 2, 2023 10:43
-
-
Save carstengehling/3c8ad55df5959a5d65a59ac6a263ed83 to your computer and use it in GitHub Desktop.
MassTransit SagaStateMachine boilerplate from documentation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>; | |
public record ProcessOrder(Guid OrderId, Guid ProcessingId); | |
public record OrderProcessed(Guid OrderId, Guid ProcessingId); | |
public record OrderCancelled(Guid OrderId, string Reason); | |
public class ProcessOrderConsumer : IConsumer<ProcessOrder> | |
{ | |
public async Task Consume(ConsumeContext<ProcessOrder> context) | |
{ | |
await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId)); | |
} | |
} | |
public class OrderState : SagaStateMachineInstance | |
{ | |
public Guid CorrelationId { get; set; } | |
public string CurrentState { get; set; } | |
public Guid? ProcessingId { get; set; } | |
public Guid? RequestId { get; set; } | |
public Uri ResponseAddress { get; set; } | |
public Guid OrderId { get; set; } | |
} | |
public class OrderStateMachine : MassTransitStateMachine<OrderState> | |
{ | |
public State Created { get; set; } | |
public State Cancelled { get; set; } | |
public Event<CreateOrder> OrderSubmitted { get; set; } | |
public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; } | |
public OrderStateMachine() | |
{ | |
InstanceState(m => m.CurrentState); | |
Event(() => OrderSubmitted); | |
Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; }); | |
Initially( | |
When(OrderSubmitted) | |
.Then(context => | |
{ | |
context.Saga.CorrelationId = context.Message.CorrelationId; | |
context.Saga.ProcessingId = Guid.NewGuid(); | |
context.Saga.OrderId = Guid.NewGuid(); | |
context.Saga.RequestId = context.RequestId; | |
context.Saga.ResponseAddress = context.ResponseAddress; | |
}) | |
.Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value)) | |
.TransitionTo(ProcessOrder.Pending)); | |
During(ProcessOrder.Pending, | |
When(ProcessOrder.Completed) | |
.TransitionTo(Created) | |
.ThenAsync(async context => | |
{ | |
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress); | |
await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId); | |
}), | |
When(ProcessOrder.Faulted) | |
.TransitionTo(Cancelled) | |
.ThenAsync(async context => | |
{ | |
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress); | |
await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Faulted"), r => r.RequestId = context.Saga.RequestId); | |
}), | |
When(ProcessOrder.TimeoutExpired) | |
.TransitionTo(Cancelled) | |
.ThenAsync(async context => | |
{ | |
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress); | |
await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Time-out"), r => r.RequestId = context.Saga.RequestId); | |
})); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment