Skip to content

Instantly share code, notes, and snippets.

@Pzixel
Created September 16, 2019 11:57
Show Gist options
  • Save Pzixel/6ad22d9d104ad9602b1834e202f907f1 to your computer and use it in GitHub Desktop.
Save Pzixel/6ad22d9d104ad9602b1834e202f907f1 to your computer and use it in GitHub Desktop.
var graph = GraphDsl.Create(b =>
{
b.From(new RabbitSource<StartProcessTriggerMessage>(workerSettings.RoutingKey, mqFactory))
.Via(Flow.Create<RabbitMessage<StartProcessTriggerMessage>>()
.SelectAsync(Environment.ProcessorCount, async message =>
{
var stateInfoResult = await Result.TryAsync(() =>
RetryApiFuncAsync(() => GetOmsStateInfoApiClient(
message.Data.Trigger.PrincipalToken,
message.Data.Trigger.RequestId)
.OmsApiOrdersByOrderIdStateInfoGetAsyncWithHttpInfo(message.Data.Trigger.OrderId)));
return stateInfoResult.Map(stateInfo => (stateInfo, message)).MapError(_ => message);
}).Select(
infoMessagePairResult => infoMessagePairResult.Bind(intoMessagePair =>
{
if (statesToNotify.Contains(intoMessagePair.stateInfo.State))
{
return infoMessagePairResult;
}
return new Error<(StateInfo stateInfo, RabbitMessage<StartProcessTriggerMessage> message
),
RabbitMessage<StartProcessTriggerMessage>>(intoMessagePair.message);
}))
.SelectAsync(Environment.ProcessorCount, infoMessagePairResult =>
infoMessagePairResult.BindAsync(async infoMessagePair =>
{
var orderResult = await Result.TryAsync(() =>
RetryApiFuncAsync(() =>
GetOrdersApiClient(infoMessagePair.message.Data.Trigger.PrincipalToken,
infoMessagePair.message.Data.Trigger.RequestId)
.OmsApiOrdersByOrderIdGetAsyncWithHttpInfo(infoMessagePair.message.Data
.Trigger
.OrderId)));
return orderResult.Map(order => (order, infoMessagePair.message))
.MapError(_ => infoMessagePair.message);
}))
.SelectAsync(Environment.ProcessorCount, orderMessagePairResult =>
orderMessagePairResult.BindAsync(async orderMessagePair =>
{
var email = emailGeneratorService.GetNotificationMessage(orderMessagePair.order);
var orderResult = await Result.TryAsync(() =>
RetrySendGridFuncAsync(() =>
sendGridClient.SendEmailAsync(email)));
return orderResult.Map(order => orderMessagePair.message)
.MapError(_ => orderMessagePair.message);
})))
.To(
Sink.ForEach<IResult<RabbitMessage<StartProcessTriggerMessage>,
RabbitMessage<StartProcessTriggerMessage>>>(
x => x.Scan(
message => message.Ack(),
message => message.Ack())));
return ClosedShape.Instance;
});
@aikidos
Copy link

aikidos commented Mar 25, 2020

var graph = GraphDsl.Create(b =>
{
    var source = new RabbitSource<StartProcessTriggerMessage>(workerSettings.RoutingKey, mqFactory);

    var flow = Flow.Create<RabbitMessage<StartProcessTriggerMessage>>()
        .SelectAsync(Environment.ProcessorCount, async message =>
        {
            var result = await Result.TryAsync(() => RetryApiFuncAsync(() =>
            {
                var trigger = message.Data.Trigger;

                return GetOmsStateInfoApiClient(trigger.PrincipalToken, trigger.RequestId)
                    .OmsApiOrdersByOrderIdStateInfoGetAsyncWithHttpInfo(trigger.OrderId);
            }));

            return result
                .Map(stateInfo => (stateInfo, message))
                .MapError(_ => message);
        })
        .Select(infoMessagePairResult =>
        {
            return infoMessagePairResult.Bind(intoMessagePair =>
            {
                if (statesToNotify.Contains(intoMessagePair.stateInfo.State))
                {
                    return infoMessagePairResult;
                }

                return new Error<(StateInfo stateInfo, RabbitMessage<StartProcessTriggerMessage> message),
                    RabbitMessage<StartProcessTriggerMessage>>(intoMessagePair.message);
            });
        })
        .SelectAsync(Environment.ProcessorCount, infoMessagePairResult =>
        {
            return infoMessagePairResult.BindAsync(async infoMessagePair =>
            {
                var message = infoMessagePair.message;
                
                var result = await Result.TryAsync(() => RetryApiFuncAsync(() =>
                {
                    var trigger = message.Data.Trigger;

                    return GetOrdersApiClient(trigger.PrincipalToken, trigger.RequestId)
                        .OmsApiOrdersByOrderIdGetAsyncWithHttpInfo(trigger.OrderId);
                }));

                return result
                    .Map(order => (order, message))
                    .MapError(_ => message);
            });
        })
        .SelectAsync(Environment.ProcessorCount, orderMessagePairResult =>
        {
            return orderMessagePairResult.BindAsync(async orderMessagePair =>
            {
                var email = emailGeneratorService.GetNotificationMessage(orderMessagePair.order);

                var result = await Result.TryAsync(() => RetrySendGridFuncAsync(() =>
                {
                    return sendGridClient.SendEmailAsync(email);
                }));

                var message = orderMessagePair.message;
                    
                return result
                    .Map(order => message)
                    .MapError(_ => message);
            });
        });

    b.From(source)
        .Via(flow)
        .To(Sink
            .ForEach<IResult<RabbitMessage<StartProcessTriggerMessage>,
                RabbitMessage<StartProcessTriggerMessage>>>(
                x =>
                {
                    return x.Scan(
                        message => message.Ack(),
                        message => message.Ack());
                })
        );

    return ClosedShape.Instance;
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment