Created
October 30, 2024 06:12
-
-
Save zudsniper/5dcd0ac87ee3e6ad71d66900197b9081 to your computer and use it in GitHub Desktop.
⛳️ perform some arbitrary logic on client and transaction data (for tanner)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import argparse | |
from loguru import logger | |
from colorama import Fore, Back, Style, init | |
import os | |
import glob | |
import time | |
import readline # For enabling tab completion | |
import rlcompleter # For readline completer | |
# Version Information | |
VERSION = "1.5.13" | |
# Initialize colorama for Windows compatibility | |
init(autoreset=True) | |
# Configure loguru with default log level (always DEBUG for this script) | |
logger.add("logs/output.log", format="{time} {level} {message}", level="DEBUG", rotation="1 MB", compression="zip") | |
# Enable tab completion for interactive mode | |
readline.set_completer_delims(' \t\n"\'') | |
readline.parse_and_bind("tab: complete") | |
# Custom completer function for filesystem paths | |
def path_completer(text, state): | |
line = readline.get_line_buffer().split() | |
if not line: | |
return [os.path.join('.', f) + os.sep if os.path.isdir(f) else f for f in os.listdir('.')][state] | |
else: | |
dirname, partial_filename = os.path.split(text) | |
if not dirname: | |
dirname = '.' | |
matches = [f for f in os.listdir(dirname) if f.startswith(partial_filename)] | |
if os.path.isdir(os.path.join(dirname, matches[state])): | |
return os.path.join(dirname, matches[state]) + os.sep | |
else: | |
return os.path.join(dirname, matches[state]) | |
readline.set_completer(path_completer) | |
# Print version and author info | |
print(Fore.MAGENTA + Style.BRIGHT + f"===== flagged emails transaction analysis v{VERSION} =====") | |
print(Fore.YELLOW + Style.BRIGHT + "by @zudsniper on GitHub\n2024 \U0001F4C4 \U0001F4D6 \U0001F4BC \U0001F5C4") | |
# Define command line arguments | |
parser = argparse.ArgumentParser(description="Ingest CSV files and generate output report for flagged emails.") | |
parser.add_argument('-e', '--emails', help='Flagged emails CSV file path') | |
parser.add_argument('-cr', '--client_records', help='Client records CSV file path') | |
parser.add_argument('-pr', '--payments_refunds_all', nargs='+', help='List of Payments Refunds CSV files paths or wildcard pattern') | |
parser.add_argument('-o', '--output', default='out.csv', help='Output CSV file path (default: out.csv)') | |
parser.add_argument('-t', '--totals', action='store_true', help='Add a final row with totals for applicable fields (trans_total, trans_cnt)') | |
parser.add_argument('-st', '--stats', action='store_true', help='Enable additional transaction statistics (trans_mean, trans_med, trans_std, trans_var) and display client ID') | |
args = parser.parse_args() | |
# If -pr is a wildcard, expand it | |
if args.payments_refunds_all: | |
expanded_files = [] | |
for pattern in args.payments_refunds_all: | |
if '*' in pattern: | |
expanded_files += glob.glob(pattern) | |
else: | |
expanded_files.append(pattern) | |
args.payments_refunds_all = expanded_files | |
if not args.payments_refunds_all: | |
logger.error("No files matched the provided wildcard pattern.") | |
print(Fore.RED + "Error: No valid files found for the provided payments/refunds pattern.") | |
exit(1) | |
else: | |
print(f"{Style.BRIGHT}{len(args.payments_refunds_all)}{Style.RESET_ALL} Payments/refunds files {Fore.GREEN}loaded{Style.RESET_ALL}: {', '.join(args.payments_refunds_all)}") | |
# Interactive mode if no required arguments are provided | |
if not args.emails or not args.client_records or not args.payments_refunds_all: | |
print(Fore.CYAN + "Interactive mode initiated. Please provide the necessary inputs.") | |
while True: | |
try: | |
# Prompt for flagged emails CSV if not provided | |
if not args.emails: | |
emails_path = input("Enter the flagged emails CSV file path: ") | |
if not os.path.exists(emails_path): | |
print(Fore.RED + "File not found. Please enter a valid file path.") | |
continue | |
args.emails = emails_path | |
print(Fore.GREEN + f"Flagged emails file loaded: {emails_path}") | |
# Prompt for client records CSV if not provided | |
if not args.client_records: | |
client_records_path = input("Enter the client records CSV file path: ") | |
if not os.path.exists(client_records_path): | |
print(Fore.RED + "File not found. Please enter a valid file path.") | |
continue | |
args.client_records = client_records_path | |
print(Fore.GREEN + f"Client records file loaded: {client_records_path}") | |
# Prompt for payments/refunds CSV(s) if not provided | |
if not args.payments_refunds_all: | |
payments_refunds_input = input("Enter the payments/refunds CSV file paths (use space-separated list or wildcard pattern): ") | |
payments_refunds_files = [] | |
if '*' in payments_refunds_input: | |
payments_refunds_files = glob.glob(payments_refunds_input) | |
else: | |
payments_refunds_files = payments_refunds_input.split() | |
if not payments_refunds_files: | |
print(Fore.RED + "No valid files found. Please enter a valid file path or pattern.") | |
continue | |
args.payments_refunds_all = payments_refunds_files | |
print(f"{Style.BRIGHT}{len(payments_refunds_files)}{Style.RESET_ALL} Payments/refunds files {Fore.GREEN}loaded{Style.RESET_ALL}: {', '.join(payments_refunds_files)}") | |
# Prompt for output CSV | |
output_path = input(Fore.BLUE + f"Enter output CSV file path (default: {args.output}): ") or args.output | |
if os.path.exists(output_path): | |
override = input(Fore.YELLOW + "File already exists. Do you want to override it? (y/n): ").lower() | |
if override == 'n': | |
output_path = f"out{int(time.time())}.csv" | |
print(Fore.GREEN + f"New output file will be created: {output_path}") | |
args.output = output_path | |
break | |
except KeyboardInterrupt: | |
print(Fore.RED + "\nInteractive session terminated by user.") | |
exit(0) | |
# Function to load CSV into a dataframe with logging | |
def load_csv(filepath, df_name): | |
try: | |
df = pd.read_csv(filepath) | |
df.fillna('', inplace=True) # Handle missing values by replacing NaN with empty strings | |
logger.info(f"Successfully loaded {df_name} from {filepath}") | |
#logger.debug(f"{df_name} Head:\n{df.head()}\n") | |
return df | |
except FileNotFoundError as e: | |
logger.error(f"File not found: {filepath}") | |
raise e | |
except pd.errors.EmptyDataError as e: | |
logger.error(f"No data found in {filepath}") | |
raise e | |
# Load flagged emails | |
df_flagged_emails = load_csv(args.emails, "Flagged Emails") | |
# Check for duplicate emails in flagged list | |
duplicate_flagged_emails = df_flagged_emails[df_flagged_emails.duplicated('Email', keep=False)] | |
if not duplicate_flagged_emails.empty: | |
logger.warning(f"Duplicate flagged emails found: {duplicate_flagged_emails}") | |
df_flagged_emails = df_flagged_emails.drop_duplicates(subset='Email') | |
logger.info("Duplicate entries removed from flagged emails list.") | |
# Load client records | |
df_client_records = load_csv(args.client_records, "Client Records") | |
# Filter flagged emails to client records | |
flagged_clients = df_client_records[df_client_records['Email Address'].isin(df_flagged_emails['Email'])] | |
# Check for duplicate emails in client records, but only for those in the flagged emails | |
duplicate_client_emails = flagged_clients[flagged_clients.duplicated('Email Address', keep=False)] | |
if not duplicate_client_emails.empty: | |
logger.warning(f"Duplicate client emails found among flagged records.") | |
# Output information about flagged clients with duplicate emails, grouped for readability | |
duplicate_clients_info = flagged_clients[flagged_clients['Email Address'].isin(duplicate_client_emails['Email Address'])] | |
duplicate_clients_info_sorted = duplicate_clients_info.sort_values(by='Email Address') | |
print(f"{Style.BRIGHT}{len(duplicate_clients_info)}{Style.RESET_ALL}{Fore.RED} Duplicate emails found in flagged client records...") | |
with pd.option_context('display.max_rows', None): | |
print(duplicate_clients_info_sorted[['Email Address', 'First Name', 'Last Name']]) | |
# Load payments/refunds files and concatenate them | |
df_payments_refunds = pd.DataFrame() | |
for file in args.payments_refunds_all: | |
df_temp = load_csv(file, "Payments/Refunds") | |
df_payments_refunds = pd.concat([df_payments_refunds, df_temp]) | |
# Remove duplicate transactions across different files | |
df_payments_refunds.drop_duplicates(inplace=True) | |
logger.info("All payments/refunds files loaded and merged successfully.") | |
# Join operations | |
logger.info(Fore.CYAN + "Starting data operations to generate output report.") | |
# Merge payments/refunds with flagged clients to get all transactions per flagged user | |
merged_data = pd.merge(df_payments_refunds, flagged_clients, left_on="Client Id", right_on="ClientRecord id", how="inner") | |
# Calculate metrics | |
if args.stats: | |
output_data = merged_data.groupby(['ClientRecord id', 'Email Address', 'First Name', 'Last Name']).agg( | |
trans_total=('Transaction Amount', 'sum'), | |
trans_cnt=('Transaction Amount', 'count'), | |
trans_mean=('Transaction Amount', 'mean'), | |
trans_med=('Transaction Amount', 'median'), | |
trans_std=('Transaction Amount', 'std'), | |
trans_var=('Transaction Amount', 'var') | |
).reset_index() | |
else: | |
output_data = merged_data.groupby(['ClientRecord id', 'Email Address', 'First Name', 'Last Name']).agg( | |
trans_total=('Transaction Amount', 'sum'), | |
trans_cnt=('Transaction Amount', 'count') | |
).reset_index() | |
# Sort output by transaction total amount, largest first | |
output_data.sort_values(by='trans_total', ascending=False, inplace=True) | |
# Add a totals row if requested | |
if args.totals: | |
totals_row = { | |
'ClientRecord id': 'TOTAL', | |
'Email Address': f"{len(df_flagged_emails)} emails, {len(flagged_clients)} client records" if args.totals else '', | |
'First Name': '', | |
'Last Name': '', | |
'trans_total': output_data['trans_total'].sum(), | |
'trans_cnt': output_data['trans_cnt'].sum() | |
} | |
if args.stats: | |
totals_row.update({ | |
'trans_mean': '', | |
'trans_med': '', | |
'trans_std': '', | |
'trans_var': '' | |
}) | |
output_data = pd.concat([output_data, pd.DataFrame([totals_row])], ignore_index=True) | |
logger.info("Totals row added to output.") | |
# Save to CSV | |
output_path = args.output | |
output_data.to_csv(output_path, index=False) | |
logger.info(f"Output data written to {output_path}") | |
logger.debug(f"Output Data Head:\n{output_data.head()}\n") | |
# Informational output to console | |
print(Fore.BLUE + Style.BRIGHT + "--- Summary of Actions ---") | |
print(Fore.CYAN + f"Flagged Emails Processed: {len(df_flagged_emails)}") | |
print(Fore.CYAN + f"Clients Matching Flagged Emails: {len(flagged_clients)}") | |
print("✅" + Fore.GREEN + Style.BRIGHT + f" Output Written to: {os.path.abspath(output_path)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment