Created
October 9, 2019 16:17
-
-
Save promontis/c6e65a77d9fa3f92e38106ade9511a75 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class GroupSubscriber : IHostedService | |
{ | |
private readonly StreamsDBClient _client; | |
private readonly IDispatchToEventSubscribers _dispatchToEventSubscribers; | |
private readonly IDispatchToSagas _dispatchToSagas; | |
private readonly IEventJsonSerializer _serializer; | |
private readonly ISagaDefinitionService _sagaDefinitionService; | |
private readonly ILoadedVersionedTypes _loadedVersionedTypes; | |
private readonly IEventDefinitionService _eventDefinitionService; | |
public GroupSubscriber(StreamsDBClient client, IDispatchToEventSubscribers dispatchToEventSubscribers, IDispatchToSagas dispatchToSagas, | |
IEventJsonSerializer serializer, ISagaDefinitionService sagaDefinitionService, ILoadedVersionedTypes loadedVersionedTypes, | |
IEventDefinitionService eventDefinitionService) | |
{ | |
_client = client; | |
_dispatchToEventSubscribers = dispatchToEventSubscribers; | |
_dispatchToSagas = dispatchToSagas; | |
_serializer = serializer; | |
_sagaDefinitionService = sagaDefinitionService; | |
_loadedVersionedTypes = loadedVersionedTypes; | |
_eventDefinitionService = eventDefinitionService; | |
} | |
public async Task StartAsync(CancellationToken cancellationToken) | |
{ | |
var aggregates = new List<string>(); | |
var nameReplace = new Regex("Id$"); | |
foreach (var sagaType in _loadedVersionedTypes.Sagas) | |
{ | |
var sagaDetails = SagaDetails.From(sagaType); | |
foreach(var aggregateEventType in sagaDetails.AggregateEventTypes) | |
{ | |
var aggregateEventInterfaceType = aggregateEventType | |
.GetTypeInfo() | |
.GetInterfaces() | |
.SingleOrDefault(i => i.GetTypeInfo().IsGenericType && i.GetGenericTypeDefinition() == typeof(IAggregateEvent<,>)); | |
if (aggregateEventInterfaceType != null) | |
{ | |
var identityTypeName = aggregateEventInterfaceType.GetGenericArguments()[1].Name; | |
var aggregate = nameReplace.Replace(identityTypeName, string.Empty).ToLowerInvariant();; | |
aggregates.Add(aggregate); | |
} | |
} | |
} | |
foreach (var aggregate in aggregates) | |
{ | |
await Task.Run(async () => | |
{ | |
var groupSubscription = _client.DB().SubscribeStream($"#{aggregate}", 0); | |
while (await groupSubscription.MoveNext()) | |
{ | |
var message = groupSubscription.Current; | |
var eventJson = Encoding.UTF8.GetString(message.Value); | |
var metadataJson = Encoding.UTF8.GetString(message.Header); | |
var domainEvent = _serializer.Deserialize(eventJson, metadataJson); | |
// await _dispatchToEventSubscribers.DispatchToAsynchronousSubscribersAsync(domainEvent, cancellationToken); | |
await _dispatchToSagas.ProcessAsync(new List<IDomainEvent> { domainEvent }, cancellationToken); | |
} | |
}); | |
} | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
return Task.CompletedTask; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment