Skip to content

Instantly share code, notes, and snippets.

@bkyrlach
Forked from ckimball/gist:9e4a9932970d0d584dcb713b38a2a057
Last active April 6, 2016 19:46
Show Gist options
  • Save bkyrlach/86ece0ac7bcdcc7ff1974b596993a2cc to your computer and use it in GitHub Desktop.
Save bkyrlach/86ece0ac7bcdcc7ff1974b596993a2cc to your computer and use it in GitHub Desktop.
Notification Processing Rewrite
/// <summary>
/// Attempt to process the RabbitMQ message for the subscriber notification
/// </summary>
/// <param name="message"></param>
/// <param name="retryCount"></param>
/// <returns></returns>
private IoEither<Failure, Unit> ProcessMessage(IConsumeContext<ProcessSubscriberNotification> message, int retryCount)
{
Func<IConsumeContext<ProcessSubscriberNotification>, string, int, IoEither<Failure, Unit>> delayMessageProcessing = (m, log, delay) =>
from _0 in Logger.DebugU($"STI Subscriber Notification Handler: {log}. Will retry after a delay of {delay} milliseconds.").ToIoEither<Failure, Unit>()
from _1 in m.DelayProcessing(delay).ToMaybe().AsEither<Failure, Unit>(() => new GenericFailure($"Failed to delay message[{m.AsReadableObject()}]")).ToIoEither()
select Unit.Only;
return
from id in message.ToMaybe().SelectMany(m => m.Message.ToMaybe()).Select(m => m.NotificationId).AsEither<Failure, int>(() => new GenericFailure($"Unable to process message[{message.AsReadableObject()}]")).ToIoEither()
from notification in Io.Apply(() => _dataContext.Set<SubscriberNotification>().Find(id).ToMaybe().AsEither<Failure, SubscriberNotification>(() => new GenericFailure($"Unable to get notification[{id}] for message."))).ToIoEither()
let delay = new TimeSpan(0, 0, (int)ProcessDelay, 0, retryCount * RetryDelay)
let timeSinceReceived = DateTime.Now.Subtract(notification.Created)
from _0 in timeSinceReceived.TotalMinutes < ProcessDelay
? delayMessageProcessing(message, $"Delaying processing of subscriber notification[{id}]", RetryDelay * (int)ProcessDelay / (timeSinceReceived.Minutes + 1))
: retryCount > 0 && timeSinceReceived < delay
? delayMessageProcessing(message, $"Failed to process subscriber notification[{id}] {retryCount} times", delay.Milliseconds)
: ProcessNotification(notification).Out.SelectMany(either => either.Match(
left: failure => /* do faily stuff here Io<IEither<...>>*/,
right: success => /* do successy stuff here */)).ToIoEither() // TODO - Do something here on failure or success using the notification
select Unit.Only;
}
/// <summary>
/// Process the given subscriber notification request
/// </summary>
/// <param name="notification"></param>
private IoEither<Failure, Unit> ProcessNotification(SubscriberNotification notification)
{
//attempt to match the notification and account
return Io.Apply(() => notification.Account.ToMaybe().Match(
//if successful, process the notification
just: a => ProcessNotificationForAccount(notification, a),
//if not successfule, see if we can find it
nothing: () => Try.Attempt(() => _dataContext.Set<SystemUserAccount>().AsQueryable().GetSystemUserAccount(notification.SubscriberNumber, notification.AccountNumber)).Match(
//if we found it, process the notification
success: a => SaveAccountToNotificationAndProcess(notification, a),
//if not we can check to see if it's a holding account
failure: ex => GetHoldingAccount(notification.SubscriberNumber, notification.AccountNumber).ToMaybe().Match(
//if it is, we shouldn't process it but we can simply log that it was found
just: a => Logger.InfoU($"STI Subscriber Notification received a notification[{notification.Id}] for a subscription/holding account with subscriber number {notification.SubscriberNumber} and account number {notification.AccountNumber}. Notification skipped.").UnsafePerformIo().AsRight<Failure, Unit>(),
//if we can't find it at all, log and throw exception
nothing: () =>
{
Logger.Warn($"STI Subscriber Notification received a notification[{notification.Id}] for an invalid subscription with subscriber number {notification.SubscriberNumber} and account number {notification.AccountNumber}.");
return new GenericFailure("Unable to find an account for the given subscriber notification.").AsLeft<Failure, Unit>();
})))).ToIoEither();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment