Created
September 28, 2018 07:48
-
-
Save SzymonPobiega/785b40b22b69565118e8cd1819d67abe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Runtime.CompilerServices; | |
using System.Threading.Tasks; | |
using NServiceBus; | |
using NServiceBus.Features; | |
using NServiceBus.Pipeline; | |
using NServiceBus.Sagas; | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
Start().GetAwaiter().GetResult(); | |
} | |
static async Task Start() | |
{ | |
var config = new EndpointConfiguration("MultiTenantSagas"); | |
config.UsePersistence<LearningPersistence>(); | |
config.UseTransport<LearningTransport>(); | |
config.SendFailedMessagesTo("error"); | |
var endpoint = await Endpoint.Start(config); | |
Console.WriteLine("Type '<correlation> <tenant>' and hit enter to send a message."); | |
while (true) | |
{ | |
var line = Console.ReadLine(); | |
if (line == null) | |
{ | |
continue; | |
} | |
var parts = line.Split(' '); | |
if (parts.Length != 2) | |
{ | |
continue; | |
} | |
var correlation = parts[0].Trim(); | |
var tenant = parts[1].Trim(); | |
var message = new MyMessage | |
{ | |
Correlation = correlation, | |
}; | |
var options = new SendOptions(); | |
options.SetHeader("Tenant", tenant); | |
options.RouteToThisEndpoint(); | |
await endpoint.Send(message, options); | |
} | |
} | |
} | |
class TenantIdReaderBehavior : Behavior<IIncomingLogicalMessageContext> | |
{ | |
ConditionalWeakTable<object, string> tenantIdMap; | |
public TenantIdReaderBehavior(ConditionalWeakTable<object, string> tenantIdMap) | |
{ | |
this.tenantIdMap = tenantIdMap; | |
} | |
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next) | |
{ | |
var tenantId = context.Headers["Tenant"]; | |
tenantIdMap.Add(context.Message.Instance, tenantId); | |
return next(); | |
} | |
} | |
class MultiTenantSagasFeature : Feature | |
{ | |
public MultiTenantSagasFeature() | |
{ | |
EnableByDefault(); | |
DependsOn<Sagas>(); | |
} | |
protected override void Setup(FeatureConfigurationContext context) | |
{ | |
//ConditionalWeakTable creates weak associations between objects that don't prevent these objects from being garbage collected. | |
var tenantIdMap = new ConditionalWeakTable<object, string>(); | |
var sagaMetadata = context.Settings.Get<SagaMetadataCollection>(); | |
//We hand-pick MySaga. In real life we would probably iterate over all sagas and use some convention to select the multi-tenant ones | |
var mySagaMetadata = sagaMetadata.Find(typeof(MySaga)); | |
var correlationPropertyAccessor = (Func<object, object>) mySagaMetadata.Finders.First().Properties["property-accessor"]; | |
object tenantAwarePropertyAccessor(object msg) | |
{ | |
var propValue = correlationPropertyAccessor(msg); | |
if (!tenantIdMap.TryGetValue(msg, out var tenantId)) | |
{ | |
throw new Exception("Missing tenant ID"); | |
} | |
return $"{tenantId}_{propValue}"; | |
} | |
//We *assume* there are no other handlers for simplicity. In real world we would check and only overwrite this for property finders | |
mySagaMetadata.Finders.First().Properties["property-accessor"] = (Func<object, object>) tenantAwarePropertyAccessor; | |
context.Pipeline.Register(new TenantIdReaderBehavior(tenantIdMap), "Reads tenant ID from the headers and associates it with the message object."); | |
} | |
} | |
class MySaga : Saga<MySagaData>, IAmStartedByMessages<MyMessage> | |
{ | |
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper) | |
{ | |
mapper.ConfigureMapping<MyMessage>(m => m.Correlation).ToSaga(s => s.Correlation); | |
} | |
public Task Handle(MyMessage message, IMessageHandlerContext context) | |
{ | |
Data.Counter++; | |
Console.WriteLine($"Handing message {Data.Counter} in saga for {Data.Correlation}. Saga id: {Data.Id}."); | |
return Task.CompletedTask; | |
} | |
} | |
class MySagaData : ContainSagaData | |
{ | |
public string Correlation { get; set; } | |
public int Counter { get; set; } | |
} | |
class MyMessage : IMessage | |
{ | |
public string Correlation { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment