Skip to content

Instantly share code, notes, and snippets.

@promontis
Created October 9, 2019 16:17
Show Gist options
  • Save promontis/c6e65a77d9fa3f92e38106ade9511a75 to your computer and use it in GitHub Desktop.
Save promontis/c6e65a77d9fa3f92e38106ade9511a75 to your computer and use it in GitHub Desktop.
public class GroupSubscriber : IHostedService
{
private readonly StreamsDBClient _client;
private readonly IDispatchToEventSubscribers _dispatchToEventSubscribers;
private readonly IDispatchToSagas _dispatchToSagas;
private readonly IEventJsonSerializer _serializer;
private readonly ISagaDefinitionService _sagaDefinitionService;
private readonly ILoadedVersionedTypes _loadedVersionedTypes;
private readonly IEventDefinitionService _eventDefinitionService;
public GroupSubscriber(StreamsDBClient client, IDispatchToEventSubscribers dispatchToEventSubscribers, IDispatchToSagas dispatchToSagas,
IEventJsonSerializer serializer, ISagaDefinitionService sagaDefinitionService, ILoadedVersionedTypes loadedVersionedTypes,
IEventDefinitionService eventDefinitionService)
{
_client = client;
_dispatchToEventSubscribers = dispatchToEventSubscribers;
_dispatchToSagas = dispatchToSagas;
_serializer = serializer;
_sagaDefinitionService = sagaDefinitionService;
_loadedVersionedTypes = loadedVersionedTypes;
_eventDefinitionService = eventDefinitionService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
var aggregates = new List<string>();
var nameReplace = new Regex("Id$");
foreach (var sagaType in _loadedVersionedTypes.Sagas)
{
var sagaDetails = SagaDetails.From(sagaType);
foreach(var aggregateEventType in sagaDetails.AggregateEventTypes)
{
var aggregateEventInterfaceType = aggregateEventType
.GetTypeInfo()
.GetInterfaces()
.SingleOrDefault(i => i.GetTypeInfo().IsGenericType && i.GetGenericTypeDefinition() == typeof(IAggregateEvent<,>));
if (aggregateEventInterfaceType != null)
{
var identityTypeName = aggregateEventInterfaceType.GetGenericArguments()[1].Name;
var aggregate = nameReplace.Replace(identityTypeName, string.Empty).ToLowerInvariant();;
aggregates.Add(aggregate);
}
}
}
foreach (var aggregate in aggregates)
{
await Task.Run(async () =>
{
var groupSubscription = _client.DB().SubscribeStream($"#{aggregate}", 0);
while (await groupSubscription.MoveNext())
{
var message = groupSubscription.Current;
var eventJson = Encoding.UTF8.GetString(message.Value);
var metadataJson = Encoding.UTF8.GetString(message.Header);
var domainEvent = _serializer.Deserialize(eventJson, metadataJson);
// await _dispatchToEventSubscribers.DispatchToAsynchronousSubscribersAsync(domainEvent, cancellationToken);
await _dispatchToSagas.ProcessAsync(new List<IDomainEvent> { domainEvent }, cancellationToken);
}
});
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment