Last active
March 24, 2021 18:28
-
-
Save jrutley/8d704eadf30574f9a3828bc8aec8452f to your computer and use it in GitHub Desktop.
Repro steps for outbox issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>net5.0</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="MassTransit" Version="7.1.7" /> | |
</ItemGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using MassTransit; | |
using MassTransit.Saga; | |
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace MassTransitReplyExceptionTest | |
{ | |
class Program | |
{ | |
public static async Task Main() | |
{ | |
var busControl = Bus.Factory.CreateUsingInMemory(conf => | |
{ | |
conf.AutoStart = true; | |
var machine = new SubmitOrderSaga(); | |
var repository = new InMemorySagaRepository<MyState>(); | |
conf.ReceiveEndpoint("saga", e => | |
{ | |
e.StateMachineSaga(machine, repository); | |
}); | |
conf.ReceiveEndpoint("saga-outbox", e => | |
{ | |
e.UseInMemoryOutbox(); | |
e.StateMachineSaga(machine, repository); | |
}); | |
}); | |
var source = new CancellationTokenSource(TimeSpan.FromSeconds(10)); | |
await busControl.StartAsync(source.Token); | |
try | |
{ | |
while (true) | |
{ | |
string value = await Task.Run(() => | |
{ | |
Console.WriteLine("-------------------------------------------------\n\n\nPress 1 | 2 for Saga success/fail. Press 3 | 4 for Saga+outbox success/fail. Anything else to exit"); | |
Console.Write("> "); | |
return Console.ReadLine(); | |
}); | |
if (value.Length == 0) return; | |
(string serviceAddress, bool? returnSuccessPlease) = value[0] switch | |
{ | |
'1' => ("saga", true), | |
'2' => ("saga", false), | |
'3' => ("saga-outbox", true), | |
'4' => ("saga-outbox", false), | |
_ => ((string)null, (bool?)null) | |
}; | |
if (serviceAddress == null) return; | |
var requestClient = busControl.CreateRequestClient<SubmitOrder>(new Uri($"queue:{serviceAddress}"), RequestTimeout.After(s:5)); | |
try | |
{ | |
var orderId = NewId.NextGuid(); | |
Response response = await requestClient.GetResponse<OrderSubmitted>(new | |
{ | |
OrderId = orderId, | |
ReturnSuccess = returnSuccessPlease | |
}); | |
var resultId = response switch | |
{ | |
(_, OrderSubmitted s) => s.OrderId, | |
_ => throw new Exception("Unexpected result") | |
}; | |
Console.WriteLine($"Received a response of {resultId}"); | |
} | |
catch (RequestFaultException e) | |
{ | |
Console.WriteLine("FAULT EXCEPTION CAUGHT"); | |
Console.WriteLine(e); | |
Console.WriteLine(e.InnerException); | |
} | |
} | |
} | |
finally | |
{ | |
await busControl.StopAsync(); | |
} | |
} | |
} | |
public interface SubmitOrder | |
{ | |
public Guid OrderId { get; set; } | |
public bool ReturnSuccess { get; set; } | |
} | |
public interface OrderSubmitted | |
{ | |
public Guid OrderId { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Automatonymous; | |
using System; | |
namespace MassTransitReplyExceptionTest | |
{ | |
public class MyState : SagaStateMachineInstance | |
{ | |
public Guid CorrelationId { get; set; } | |
public string CurrentState { get; internal set; } | |
} | |
public class SubmitOrderSaga : MassTransitStateMachine<MyState> | |
{ | |
public SubmitOrderSaga() | |
{ | |
Event(() => SubmitOrder, c => c.CorrelateById(x => x.Message.OrderId)); | |
InstanceState(x => x.CurrentState); | |
Initially(When(SubmitOrder).ThenAsync(async x => | |
{ | |
if (!x.Data.ReturnSuccess) throw new Exception("Saga fail"); | |
var ctx = x.CreateConsumeContext(); | |
await ctx.RespondAsync<OrderSubmitted>(new | |
{ | |
x.Data.OrderId | |
}); | |
}).TransitionTo(Final)); | |
SetCompletedWhenFinalized(); | |
} | |
Event<SubmitOrder> SubmitOrder { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment