Created
July 5, 2023 17:41
-
-
Save dosper7/8cd144729a89daab346ace386f3a381a to your computer and use it in GitHub Desktop.
PayoutBusiness async/await refactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Confluent.Kafka; | |
using LinqKit; | |
using Medallion.Threading; | |
using Microsoft.Extensions.Logging; | |
using Microsoft.Extensions.Options; | |
using System.Collections.Concurrent; | |
using System.Diagnostics; | |
using System.Linq.Expressions; | |
using System.Runtime.CompilerServices; | |
using Techops.Shared.Helpers.Currency; | |
using Techops.Shared.Helpers.Dates; | |
using Techops.Shared.Helpers.Exceptions; | |
using Techops.Shared.Kafka.Interfaces; | |
using Techops.Way4.Exporter.Business.Services.Interfaces; | |
using Techops.Way4.Exporter.Data.Entity; | |
using Techops.Way4.Exporter.Data.Repositories.Interfaces; | |
using Techops.Way4.Exporter.Models.Exceptions; | |
using Techops.Way4.Exporter.Models.Generic; | |
using Techops.Way4.Exporter.Models.Job; | |
using Techops.Way4.Exporter.Models.Payouts; | |
using AMGModels = AMG.Client; | |
namespace Techops.Way4.Exporter.Business.Services | |
{ | |
public class PayoutBusiness : IPayoutBusiness | |
{ | |
private readonly ILogger<PayoutBusiness> _logger; | |
private readonly IAMGService _amgService; | |
private readonly IKafkaProducer<string, Payout> _kafkaProducer; | |
private readonly IPayoutRepository _repository; | |
private readonly IOptionsMonitor<AppSettingsModel> _appSettings; | |
private readonly IDistributedLock _distributedLock; | |
public PayoutBusiness( | |
ILogger<PayoutBusiness> logger, | |
IOptionsMonitor<AppSettingsModel> options, | |
IAMGService amgService, | |
IKafkaProducer<string, Payout> kafkaProducer, | |
IPayoutRepository repository, | |
IDistributedLock distributedLock) | |
{ | |
_amgService = amgService; | |
_logger = logger; | |
_appSettings = options; | |
_kafkaProducer = kafkaProducer; | |
_repository = repository; | |
_distributedLock = distributedLock; | |
} | |
public async Task<PayoutProcessorJobStats> ProcessPayoutsAsync(DateTime? date = null) | |
{ | |
var stopwatch = new Stopwatch(); | |
stopwatch.Start(); | |
if (date == null) | |
{ | |
stopwatch.Stop(); | |
throw new ValidationException("Date cannot be null"); | |
} | |
var successfulPayouts = 0; | |
var failedPayouts = 0; | |
var payoutEntities = await _repository.GetAsync( | |
x => x.State == PayoutState.TO_PROCESS | |
&& x.IssuedAt == date.Value.ToString("yyyy-MM-dd")); | |
var payoutToDoCount = payoutEntities.Count(); | |
if (!payoutEntities.Any()) | |
{ | |
stopwatch.Stop(); | |
_logger.Log(LogLevel.Debug, "No payouts found to be processed with date: {date}", date.Value.PrettyDate()); | |
return new PayoutProcessorJobStats(payoutToDoCount, | |
successfulPayouts, | |
failedPayouts, | |
(int)stopwatch.ElapsedMilliseconds / 1000, | |
nameof(JobType.PayoutExporter), | |
payoutToDoCount <= 0 ? "Success" : "Failure"); | |
} | |
foreach (var payout in payoutEntities) | |
{ | |
var temp = new Payout | |
{ | |
Institution = new PayoutInstitution { Name = "SaltPay-Way4" }, | |
PayoutId = payout.Id.ToString(), | |
CardAcceptorId = payout.CardAcceptorId, | |
StoreId = payout.StoreId.ToString(), | |
IssuedAt = payout.IssuedAt, | |
Amounts = new Amount | |
{ | |
PayoutCurrency = payout.Currency, | |
PayoutAmount = payout.Amount, | |
GrossTransactionAmount = payout.GrossTransactionAmount, | |
TotalFeesAmount = payout.TotalDeductionsAmount | |
} | |
}; | |
var kafkaMessageKey = $"payouts_{payout.CardAcceptorId}"; | |
var persistenceStatus = await this.SafeExecuteKafkaProducer(_kafkaProducer.ProduceAsync(_appSettings.CurrentValue.Kafka.PayoutsTopic, kafkaMessageKey, temp)); | |
_logger.Log(LogLevel.Debug, | |
"Message {KafkaMessageKey} has been sent with result: {PersistenceStatus}", kafkaMessageKey, | |
persistenceStatus); | |
switch (persistenceStatus) | |
{ | |
case PersistenceStatus.Persisted: | |
case PersistenceStatus.PossiblyPersisted: | |
payout.State = PayoutState.PROCESSED; | |
successfulPayouts++; | |
break; | |
case PersistenceStatus.NotPersisted: | |
default: | |
payout.State = PayoutState.FAILED; | |
failedPayouts++; | |
break; | |
} | |
} | |
await _repository.BulkUpdateAsync(payoutEntities); | |
stopwatch.Stop(); | |
_logger.Log(LogLevel.Information, "Sent successfully {messages} payouts to kafka", successfulPayouts); | |
return new PayoutProcessorJobStats(payoutToDoCount, | |
successfulPayouts, | |
failedPayouts, | |
(int)stopwatch.ElapsedMilliseconds / 1000, | |
nameof(JobType.PayoutExporter), | |
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? "Success" : "Failure"); | |
} | |
public async Task<int> PopulatePayoutsAsync(DateTime? date = null) | |
{ | |
if (date == null) | |
throw new ValidationException("Date cannot be null"); | |
// Only run this if it hasn't for the requested date | |
if (await _repository.Exists(x => x.IssuedAt == date.Value.ToString("yyyy-MM-dd"))) | |
return default; | |
var results = await _amgService.FetchPayoutReportData((DateTime)date); | |
if (results == null || !results.Any()) | |
{ | |
_logger.Log(LogLevel.Information, "No payout data on Date: {Date}", date.ToString()); | |
return default; | |
} | |
_logger.Log(LogLevel.Debug, "Processing {payout_count} payouts", results.Count()); | |
var payoutEntities = new List<PayoutEntity>(); | |
foreach (var payout in results) | |
{ | |
if (payout.Status.Equals(PayoutStatus.UNPAID.ToString())) | |
continue; | |
var payoutEntity = new PayoutEntity | |
{ | |
Id = payout.Id, | |
Currency = payout.Currency, | |
CardAcceptorId = payout.Mid, | |
CompanyId = string.IsNullOrWhiteSpace(payout.Gmd_company_id) ? Guid.Empty : new Guid(payout.Gmd_company_id), | |
StoreId = string.IsNullOrWhiteSpace(payout.Gmd_store_id) ? Guid.Empty : new Guid(payout.Gmd_store_id), | |
StartedAt = payout.Started_at.DateTime, | |
EndedAt = payout.Ended_at.DateTime.AddDays(1).AddSeconds(-1), | |
IssuedAt = payout.Issued_at.ToString("yyyy-MM-dd"), | |
State = PayoutState.TO_PROCESS, | |
CreatedBy = "System", | |
ExportToPayments = payout.Export_to_payments | |
}; | |
payoutEntity.State = this.FormatAmountValues(ref payoutEntity, payout, date.Value) | |
? payoutEntity.State | |
: PayoutState.UNPROCESSABLE_CURRENCY; | |
var payoutStatus = this.ParsePayoutStatus(payout, date.Value); | |
payoutEntity.Status = payoutStatus; | |
if (payoutStatus == PayoutStatus.UNKNOWN | |
&& payoutEntity.State != PayoutState.UNPROCESSABLE_CURRENCY) | |
{ | |
payoutEntity.State = PayoutState.UNPROCESSABLE_STATUS; | |
} | |
payoutEntities.Add(payoutEntity); | |
} | |
await _repository.AddAsync(payoutEntities); | |
return payoutEntities.Count; | |
} | |
public async Task<int> ForcePopulatePayoutsAsync(DateTime? date = null) | |
{ | |
if (date == null) | |
throw new ValidationException("Date cannot be null"); | |
// var results = await _amgService.FetchPayoutData((DateTime)date); | |
var results = await _amgService.FetchPayoutReportData((DateTime)date); | |
if (results == null || results.Count() == 0) | |
{ | |
_logger.Log(LogLevel.Information, "No payout data on Date: {Date}", date.ToString()); | |
return default; | |
} | |
var payoutData = (await _repository.GetAsync(x => x.IssuedAt == date.Value.ToString("yyyy-MM-dd"))).ToList(); | |
if (payoutData.Any()) | |
{ | |
results = results.Where(x => !payoutData.Any(y => y.Id == x.Id)).ToList(); | |
if (!results.Any(y => y.Status.Equals(PayoutStatus.PAID.ToString()))) | |
{ | |
_logger.Log(LogLevel.Information, "No payout data on Date: {Date} left to be processed", date.ToString()); | |
return 0; | |
} | |
} | |
_logger.Log(LogLevel.Debug, "Processing {payout_count} payouts", results.Count()); | |
var payoutEntities = new List<PayoutEntity>(); | |
foreach (var payout in results) | |
{ | |
if (payout.Status.Equals(PayoutStatus.UNPAID.ToString())) | |
continue; | |
var payoutEntity = new PayoutEntity | |
{ | |
Id = payout.Id, | |
Currency = payout.Currency, | |
CardAcceptorId = payout.Mid, | |
CompanyId = string.IsNullOrWhiteSpace(payout.Gmd_company_id) ? Guid.Empty : new Guid(payout.Gmd_company_id), | |
StoreId = string.IsNullOrWhiteSpace(payout.Gmd_store_id) ? Guid.Empty : new Guid(payout.Gmd_store_id), | |
StartedAt = payout.Started_at.DateTime, | |
EndedAt = payout.Ended_at.DateTime.AddDays(1).AddSeconds(-1), | |
IssuedAt = payout.Issued_at.ToString("yyyy-MM-dd"), | |
State = PayoutState.TO_PROCESS, | |
CreatedBy = "System", | |
ExportToPayments = payout.Export_to_payments | |
}; | |
payoutEntity.State = this.FormatAmountValues(ref payoutEntity, payout, date.Value) | |
? payoutEntity.State | |
: PayoutState.UNPROCESSABLE_CURRENCY; | |
var payoutStatus = this.ParsePayoutStatus(payout, date.Value); | |
payoutEntity.Status = payoutStatus; | |
if (payoutStatus == PayoutStatus.UNKNOWN | |
&& payoutEntity.State != PayoutState.UNPROCESSABLE_CURRENCY) | |
{ | |
payoutEntity.State = PayoutState.UNPROCESSABLE_STATUS; | |
} | |
payoutEntities.Add(payoutEntity); | |
} | |
await _repository.AddAsync(payoutEntities); | |
return payoutEntities.Count; | |
} | |
public async Task<PayoutEntity> GetPayoutByIdAsync(Guid id) | |
{ | |
if (id == Guid.Empty) | |
throw new ArgumentException("Id cannot be empty"); | |
var payout = await _repository.GetAsync(id); | |
if (payout == null) | |
throw new ArgumentException("There is no payout with given id"); | |
return payout; | |
} | |
public async Task<PayoutEntity> UpdatePayoutByIdAsync(Guid id, PayoutUpdate updatedPayout) | |
{ | |
if (id == Guid.Empty) | |
throw new ArgumentException("Id cannot be empty"); | |
var payout = await _repository.GetAsync(id); | |
if (payout == null) | |
throw new ArgumentException("There is no payout with given id"); | |
bool isUpdated = false; | |
if (updatedPayout.State.HasValue) | |
(isUpdated, payout.State) = (true, (PayoutState)updatedPayout.State); | |
if (updatedPayout.Active.HasValue) | |
(isUpdated, payout.Active) = (true, (bool)updatedPayout.Active); | |
if (!isUpdated) | |
return payout; | |
bool updated = await _repository.UpdateAsync(payout); | |
if (!updated) | |
throw new Exception("Problem saving changes to database"); | |
return payout; | |
} | |
public async Task<bool> PurgeAsync(DateTime createdBefore, CancellationToken cancellationToken) | |
{ | |
return await _repository.PurgeAsync(createdBefore, cancellationToken); | |
} | |
public async Task<(IEnumerable<PayoutEntity>, int)> GetPayoutsAsync(string q, PayoutState? state, bool? active, bool? export, int? offset, int? limit) | |
{ | |
offset ??= 0; | |
limit ??= 100; | |
var filter = GenerateFilter(q, state, active, export); | |
var payouts = await _repository.GetAsync(filter, (int)offset, (int)limit); | |
var count = await _repository.GetCount(filter); | |
return (payouts, count); | |
} | |
public async Task<IJobStats> ProcessPayoutsInBatchAsync(DateTime? date = null, int batchSize = -1) | |
{ | |
var stopwatch = new Stopwatch(); | |
stopwatch.Start(); | |
if (date == null) | |
{ | |
stopwatch.Stop(); | |
throw new ValidationException("Date cannot be null"); | |
} | |
var successfulPayouts = 0; | |
var failedPayouts = 0; | |
IEnumerable<PayoutEntity> payoutEntities; | |
int payoutToDoCount; | |
await using (await _distributedLock.AcquireAsync()) | |
{ | |
payoutEntities = await _repository.GetPayoutsToExportInBatch(date.Value, batchSize); | |
payoutToDoCount = payoutEntities.Count(); | |
if (!payoutEntities.Any()) | |
{ | |
stopwatch.Stop(); | |
_logger.Log(LogLevel.Debug, "No payouts found to be processed with date: {date}", date.Value.PrettyDate()); | |
return new JobReportStats( | |
(int)stopwatch.ElapsedMilliseconds / 1000, | |
nameof(JobType.PayoutExporter), | |
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? nameof(JobStatus.Successful) : nameof(JobStatus.Failed), | |
payoutToDoCount, | |
successfulPayouts, | |
failedPayouts); | |
} | |
Parallel.ForEach(payoutEntities, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (entity) => entity.State = PayoutState.PROCESSING); | |
await _repository.BulkUpdateAsync(payoutEntities); | |
} | |
// Necessary variables for the update phase | |
var slim = new SemaphoreSlim(1, 1); | |
var processedPayouts = new List<PayoutEntity>(payoutEntities.Count()); | |
const int updateBatchCount = 20; | |
ConcurrentBag<Task> tasks = new(); | |
var payoutTopic = _appSettings.CurrentValue.Kafka.PayoutsTopic; | |
await Parallel.ForEachAsync(payoutEntities, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, | |
(entity, cancellationToken) => | |
{ | |
tasks.Add(Task.Run(async () => | |
{ | |
var temp = new Payout | |
{ | |
Institution = new PayoutInstitution { Name = "SaltPay-Way4" }, | |
PayoutId = entity.Id.ToString(), | |
CardAcceptorId = entity.CardAcceptorId, | |
StoreId = entity.StoreId.ToString(), | |
IssuedAt = entity.IssuedAt, | |
Amounts = new Amount | |
{ | |
PayoutCurrency = entity.Currency, | |
PayoutAmount = entity.Amount, | |
GrossTransactionAmount = entity.GrossTransactionAmount, | |
TotalFeesAmount = entity.TotalDeductionsAmount | |
} | |
}; | |
var kafkaMessageKey = $"payouts_{entity.CardAcceptorId}"; | |
var persistenceStatus = await this.SafeExecuteKafkaProducer(_kafkaProducer.ProduceAsync(payoutTopic, kafkaMessageKey, temp)); | |
_logger.Log(LogLevel.Debug, "Message {KafkaMessageKey} has been sent with result: {PersistenceStatus}", kafkaMessageKey, persistenceStatus); | |
switch (persistenceStatus) | |
{ | |
case PersistenceStatus.Persisted: | |
case PersistenceStatus.PossiblyPersisted: | |
entity.State = PayoutState.PROCESSED; | |
Interlocked.Increment(ref successfulPayouts); | |
break; | |
// PersistenceStatus.NotPersisted | |
default: | |
entity.State = PayoutState.FAILED; | |
Interlocked.Increment(ref failedPayouts); | |
break; | |
} | |
try | |
{ | |
await slim.WaitAsync(cancellationToken); | |
processedPayouts.Add(entity); | |
if (processedPayouts.Count >= updateBatchCount) | |
{ | |
await _repository.BulkUpdateAsync(processedPayouts); | |
processedPayouts.Clear(); | |
} | |
} | |
finally | |
{ | |
slim.Release(); | |
} | |
}, cancellationToken)); | |
return ValueTask.CompletedTask; | |
}); | |
await Task.WhenAll(tasks); | |
if (processedPayouts.Count > 0) | |
await _repository.BulkUpdateAsync(processedPayouts); | |
stopwatch.Stop(); | |
_logger.Log(LogLevel.Information, "Sent successfully {messages} payouts to kafka", successfulPayouts); | |
return new JobReportStats( | |
(int)stopwatch.ElapsedMilliseconds / 1000, | |
nameof(JobType.PayoutExporter), | |
payoutToDoCount <= 0 || (successfulPayouts / payoutToDoCount) == 1 ? nameof(JobStatus.Successful) : nameof(JobStatus.Failed), | |
payoutToDoCount, | |
successfulPayouts, | |
failedPayouts); | |
} | |
#region private methods | |
private Expression<Func<PayoutEntity, bool>> GenerateFilter(string q, PayoutState? state, bool? active, bool? export) | |
{ | |
var predicate = PredicateBuilder.New<PayoutEntity>(true); | |
if (!string.IsNullOrEmpty(q)) | |
{ | |
q = q.Trim(); | |
predicate = predicate | |
.Or(x => x.Id.ToString() == q) | |
.Or(x => x.CompanyId.ToString() == q) | |
.Or(x => x.StoreId.ToString() == q) | |
.Or(x => x.EntityId.ToString() == q) | |
.Or(x => x.CardAcceptorId == q) | |
.Or(x => x.IssuedAt == q) | |
.Or(x => x.Currency == q); | |
} | |
if (state.HasValue) | |
predicate = predicate.And(x => x.State == state); | |
if (active.HasValue) | |
predicate = predicate.And(x => x.Active == active); | |
if (export.HasValue) | |
predicate = predicate.And(x => x.ExportToPayments == export); | |
return predicate; | |
} | |
private async Task<PersistenceStatus> SafeExecuteKafkaProducer(Task<PersistenceStatus> functionToExecute, [CallerMemberName] string nameOfFunction = "") | |
{ | |
try | |
{ | |
return await functionToExecute; | |
} | |
catch (Exception ex) | |
{ | |
_logger!.Log(LogLevel.Warning, ex, "There was an error executing the function: {nameOfFunction}.", nameOfFunction); | |
return PersistenceStatus.NotPersisted; | |
} | |
} | |
private bool FormatAmountValues(ref PayoutEntity entity, AMGModels.PayoutReportResponse payoutReportResponse, DateTime date) | |
{ | |
var sucess = true; | |
try | |
{ | |
entity.Amount = payoutReportResponse.Payout_amount.FormatCurrencyAmount(payoutReportResponse.Currency); | |
} | |
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException) | |
{ | |
entity.Amount = 0; | |
sucess = false; | |
_logger.Log(LogLevel.Error, ex, "Failed to convert payment amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency); | |
} | |
try | |
{ | |
entity.GrossTransactionAmount = payoutReportResponse.Gross_transaction_amount.FormatCurrencyAmount(payoutReportResponse.Currency); | |
} | |
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException) | |
{ | |
entity.GrossTransactionAmount = 0; | |
sucess = false; | |
_logger.Log(LogLevel.Error, ex, "Failed to convert gross transaction amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency); | |
} | |
try | |
{ | |
entity.TotalDeductionsAmount = payoutReportResponse.Total_fees_amount.FormatCurrencyAmount(payoutReportResponse.Currency); | |
} | |
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException) | |
{ | |
entity.TotalDeductionsAmount = 0; | |
sucess = false; | |
_logger.Log(LogLevel.Error, ex, "Failed to convert total deductions amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency); | |
} | |
try | |
{ | |
entity.SettleAmount = payoutReportResponse.Settle_amount.FormatCurrencyAmount(payoutReportResponse.Currency); | |
} | |
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException) | |
{ | |
entity.SettleAmount = 0; | |
sucess = false; | |
_logger.Log(LogLevel.Error, ex, "Failed to convert settle amount! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency); | |
} | |
try | |
{ | |
entity.BeginBalance = payoutReportResponse.Begin_balance.FormatCurrencyAmount(payoutReportResponse.Currency); | |
} | |
catch (Exception ex) when (ex is InvalidCurrencyException || ex is UnsupportedCurrencyException) | |
{ | |
entity.BeginBalance = 0; | |
sucess = false; | |
_logger.Log(LogLevel.Error, ex, "Failed to convert begin balance! Date = {Date}, PayoutId = {PayoutId}, Currency = {currency}", date, payoutReportResponse.Id, payoutReportResponse.Currency); | |
} | |
return sucess; | |
} | |
private PayoutStatus ParsePayoutStatus(AMGModels.PayoutReportResponse payoutReportResponse, DateTime date) | |
{ | |
PayoutStatus status = PayoutStatus.UNKNOWN; | |
try | |
{ | |
status = (PayoutStatus)Enum.Parse(typeof(PayoutStatus), payoutReportResponse.Status); | |
} | |
catch (Exception ex) | |
{ | |
_logger.Log(LogLevel.Error, ex, "Failed to set payout status! Date = {Date}, PayoutId = {PayoutId}, Status = {Status}", date, payoutReportResponse.Id, payoutReportResponse.Status); | |
} | |
return status; | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment