Forked from ckimball/gist:9e4a9932970d0d584dcb713b38a2a057
Last active
April 6, 2016 19:46
-
-
Save bkyrlach/86ece0ac7bcdcc7ff1974b596993a2cc to your computer and use it in GitHub Desktop.
Notification Processing Rewrite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Attempt to process the RabbitMQ message for the subscriber notification | |
/// </summary> | |
/// <param name="message"></param> | |
/// <param name="retryCount"></param> | |
/// <returns></returns> | |
private IoEither<Failure, Unit> ProcessMessage(IConsumeContext<ProcessSubscriberNotification> message, int retryCount) | |
{ | |
Func<IConsumeContext<ProcessSubscriberNotification>, string, int, IoEither<Failure, Unit>> delayMessageProcessing = (m, log, delay) => | |
from _0 in Logger.DebugU($"STI Subscriber Notification Handler: {log}. Will retry after a delay of {delay} milliseconds.").ToIoEither<Failure, Unit>() | |
from _1 in m.DelayProcessing(delay).ToMaybe().AsEither<Failure, Unit>(() => new GenericFailure($"Failed to delay message[{m.AsReadableObject()}]")).ToIoEither() | |
select Unit.Only; | |
return | |
from id in message.ToMaybe().SelectMany(m => m.Message.ToMaybe()).Select(m => m.NotificationId).AsEither<Failure, int>(() => new GenericFailure($"Unable to process message[{message.AsReadableObject()}]")).ToIoEither() | |
from notification in Io.Apply(() => _dataContext.Set<SubscriberNotification>().Find(id).ToMaybe().AsEither<Failure, SubscriberNotification>(() => new GenericFailure($"Unable to get notification[{id}] for message."))).ToIoEither() | |
let delay = new TimeSpan(0, 0, (int)ProcessDelay, 0, retryCount * RetryDelay) | |
let timeSinceReceived = DateTime.Now.Subtract(notification.Created) | |
from _0 in timeSinceReceived.TotalMinutes < ProcessDelay | |
? delayMessageProcessing(message, $"Delaying processing of subscriber notification[{id}]", RetryDelay * (int)ProcessDelay / (timeSinceReceived.Minutes + 1)) | |
: retryCount > 0 && timeSinceReceived < delay | |
? delayMessageProcessing(message, $"Failed to process subscriber notification[{id}] {retryCount} times", delay.Milliseconds) | |
: ProcessNotification(notification).Out.SelectMany(either => either.Match( | |
left: failure => /* do faily stuff here Io<IEither<...>>*/, | |
right: success => /* do successy stuff here */)).ToIoEither() // TODO - Do something here on failure or success using the notification | |
select Unit.Only; | |
} | |
/// <summary> | |
/// Process the given subscriber notification request | |
/// </summary> | |
/// <param name="notification"></param> | |
private IoEither<Failure, Unit> ProcessNotification(SubscriberNotification notification) | |
{ | |
//attempt to match the notification and account | |
return Io.Apply(() => notification.Account.ToMaybe().Match( | |
//if successful, process the notification | |
just: a => ProcessNotificationForAccount(notification, a), | |
//if not successfule, see if we can find it | |
nothing: () => Try.Attempt(() => _dataContext.Set<SystemUserAccount>().AsQueryable().GetSystemUserAccount(notification.SubscriberNumber, notification.AccountNumber)).Match( | |
//if we found it, process the notification | |
success: a => SaveAccountToNotificationAndProcess(notification, a), | |
//if not we can check to see if it's a holding account | |
failure: ex => GetHoldingAccount(notification.SubscriberNumber, notification.AccountNumber).ToMaybe().Match( | |
//if it is, we shouldn't process it but we can simply log that it was found | |
just: a => Logger.InfoU($"STI Subscriber Notification received a notification[{notification.Id}] for a subscription/holding account with subscriber number {notification.SubscriberNumber} and account number {notification.AccountNumber}. Notification skipped.").UnsafePerformIo().AsRight<Failure, Unit>(), | |
//if we can't find it at all, log and throw exception | |
nothing: () => | |
{ | |
Logger.Warn($"STI Subscriber Notification received a notification[{notification.Id}] for an invalid subscription with subscriber number {notification.SubscriberNumber} and account number {notification.AccountNumber}."); | |
return new GenericFailure("Unable to find an account for the given subscriber notification.").AsLeft<Failure, Unit>(); | |
})))).ToIoEither(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment