Skip to content

Instantly share code, notes, and snippets.

@zudsniper
Created October 30, 2024 06:12
Show Gist options
  • Save zudsniper/5dcd0ac87ee3e6ad71d66900197b9081 to your computer and use it in GitHub Desktop.
Save zudsniper/5dcd0ac87ee3e6ad71d66900197b9081 to your computer and use it in GitHub Desktop.
⛳️ perform some arbitrary logic on client and transaction data (for tanner)
import pandas as pd
import argparse
from loguru import logger
from colorama import Fore, Back, Style, init
import os
import glob
import time
import readline # For enabling tab completion
import rlcompleter # For readline completer
# Version Information
VERSION = "1.5.13"
# Initialize colorama for Windows compatibility
init(autoreset=True)
# Configure loguru with default log level (always DEBUG for this script)
logger.add("logs/output.log", format="{time} {level} {message}", level="DEBUG", rotation="1 MB", compression="zip")
# Enable tab completion for interactive mode
readline.set_completer_delims(' \t\n"\'')
readline.parse_and_bind("tab: complete")
# Custom completer function for filesystem paths
def path_completer(text, state):
line = readline.get_line_buffer().split()
if not line:
return [os.path.join('.', f) + os.sep if os.path.isdir(f) else f for f in os.listdir('.')][state]
else:
dirname, partial_filename = os.path.split(text)
if not dirname:
dirname = '.'
matches = [f for f in os.listdir(dirname) if f.startswith(partial_filename)]
if os.path.isdir(os.path.join(dirname, matches[state])):
return os.path.join(dirname, matches[state]) + os.sep
else:
return os.path.join(dirname, matches[state])
readline.set_completer(path_completer)
# Print version and author info
print(Fore.MAGENTA + Style.BRIGHT + f"===== flagged emails transaction analysis v{VERSION} =====")
print(Fore.YELLOW + Style.BRIGHT + "by @zudsniper on GitHub\n2024 \U0001F4C4 \U0001F4D6 \U0001F4BC \U0001F5C4")
# Define command line arguments
parser = argparse.ArgumentParser(description="Ingest CSV files and generate output report for flagged emails.")
parser.add_argument('-e', '--emails', help='Flagged emails CSV file path')
parser.add_argument('-cr', '--client_records', help='Client records CSV file path')
parser.add_argument('-pr', '--payments_refunds_all', nargs='+', help='List of Payments Refunds CSV files paths or wildcard pattern')
parser.add_argument('-o', '--output', default='out.csv', help='Output CSV file path (default: out.csv)')
parser.add_argument('-t', '--totals', action='store_true', help='Add a final row with totals for applicable fields (trans_total, trans_cnt)')
parser.add_argument('-st', '--stats', action='store_true', help='Enable additional transaction statistics (trans_mean, trans_med, trans_std, trans_var) and display client ID')
args = parser.parse_args()
# If -pr is a wildcard, expand it
if args.payments_refunds_all:
expanded_files = []
for pattern in args.payments_refunds_all:
if '*' in pattern:
expanded_files += glob.glob(pattern)
else:
expanded_files.append(pattern)
args.payments_refunds_all = expanded_files
if not args.payments_refunds_all:
logger.error("No files matched the provided wildcard pattern.")
print(Fore.RED + "Error: No valid files found for the provided payments/refunds pattern.")
exit(1)
else:
print(f"{Style.BRIGHT}{len(args.payments_refunds_all)}{Style.RESET_ALL} Payments/refunds files {Fore.GREEN}loaded{Style.RESET_ALL}: {', '.join(args.payments_refunds_all)}")
# Interactive mode if no required arguments are provided
if not args.emails or not args.client_records or not args.payments_refunds_all:
print(Fore.CYAN + "Interactive mode initiated. Please provide the necessary inputs.")
while True:
try:
# Prompt for flagged emails CSV if not provided
if not args.emails:
emails_path = input("Enter the flagged emails CSV file path: ")
if not os.path.exists(emails_path):
print(Fore.RED + "File not found. Please enter a valid file path.")
continue
args.emails = emails_path
print(Fore.GREEN + f"Flagged emails file loaded: {emails_path}")
# Prompt for client records CSV if not provided
if not args.client_records:
client_records_path = input("Enter the client records CSV file path: ")
if not os.path.exists(client_records_path):
print(Fore.RED + "File not found. Please enter a valid file path.")
continue
args.client_records = client_records_path
print(Fore.GREEN + f"Client records file loaded: {client_records_path}")
# Prompt for payments/refunds CSV(s) if not provided
if not args.payments_refunds_all:
payments_refunds_input = input("Enter the payments/refunds CSV file paths (use space-separated list or wildcard pattern): ")
payments_refunds_files = []
if '*' in payments_refunds_input:
payments_refunds_files = glob.glob(payments_refunds_input)
else:
payments_refunds_files = payments_refunds_input.split()
if not payments_refunds_files:
print(Fore.RED + "No valid files found. Please enter a valid file path or pattern.")
continue
args.payments_refunds_all = payments_refunds_files
print(f"{Style.BRIGHT}{len(payments_refunds_files)}{Style.RESET_ALL} Payments/refunds files {Fore.GREEN}loaded{Style.RESET_ALL}: {', '.join(payments_refunds_files)}")
# Prompt for output CSV
output_path = input(Fore.BLUE + f"Enter output CSV file path (default: {args.output}): ") or args.output
if os.path.exists(output_path):
override = input(Fore.YELLOW + "File already exists. Do you want to override it? (y/n): ").lower()
if override == 'n':
output_path = f"out{int(time.time())}.csv"
print(Fore.GREEN + f"New output file will be created: {output_path}")
args.output = output_path
break
except KeyboardInterrupt:
print(Fore.RED + "\nInteractive session terminated by user.")
exit(0)
# Function to load CSV into a dataframe with logging
def load_csv(filepath, df_name):
try:
df = pd.read_csv(filepath)
df.fillna('', inplace=True) # Handle missing values by replacing NaN with empty strings
logger.info(f"Successfully loaded {df_name} from {filepath}")
#logger.debug(f"{df_name} Head:\n{df.head()}\n")
return df
except FileNotFoundError as e:
logger.error(f"File not found: {filepath}")
raise e
except pd.errors.EmptyDataError as e:
logger.error(f"No data found in {filepath}")
raise e
# Load flagged emails
df_flagged_emails = load_csv(args.emails, "Flagged Emails")
# Check for duplicate emails in flagged list
duplicate_flagged_emails = df_flagged_emails[df_flagged_emails.duplicated('Email', keep=False)]
if not duplicate_flagged_emails.empty:
logger.warning(f"Duplicate flagged emails found: {duplicate_flagged_emails}")
df_flagged_emails = df_flagged_emails.drop_duplicates(subset='Email')
logger.info("Duplicate entries removed from flagged emails list.")
# Load client records
df_client_records = load_csv(args.client_records, "Client Records")
# Filter flagged emails to client records
flagged_clients = df_client_records[df_client_records['Email Address'].isin(df_flagged_emails['Email'])]
# Check for duplicate emails in client records, but only for those in the flagged emails
duplicate_client_emails = flagged_clients[flagged_clients.duplicated('Email Address', keep=False)]
if not duplicate_client_emails.empty:
logger.warning(f"Duplicate client emails found among flagged records.")
# Output information about flagged clients with duplicate emails, grouped for readability
duplicate_clients_info = flagged_clients[flagged_clients['Email Address'].isin(duplicate_client_emails['Email Address'])]
duplicate_clients_info_sorted = duplicate_clients_info.sort_values(by='Email Address')
print(f"{Style.BRIGHT}{len(duplicate_clients_info)}{Style.RESET_ALL}{Fore.RED} Duplicate emails found in flagged client records...")
with pd.option_context('display.max_rows', None):
print(duplicate_clients_info_sorted[['Email Address', 'First Name', 'Last Name']])
# Load payments/refunds files and concatenate them
df_payments_refunds = pd.DataFrame()
for file in args.payments_refunds_all:
df_temp = load_csv(file, "Payments/Refunds")
df_payments_refunds = pd.concat([df_payments_refunds, df_temp])
# Remove duplicate transactions across different files
df_payments_refunds.drop_duplicates(inplace=True)
logger.info("All payments/refunds files loaded and merged successfully.")
# Join operations
logger.info(Fore.CYAN + "Starting data operations to generate output report.")
# Merge payments/refunds with flagged clients to get all transactions per flagged user
merged_data = pd.merge(df_payments_refunds, flagged_clients, left_on="Client Id", right_on="ClientRecord id", how="inner")
# Calculate metrics
if args.stats:
output_data = merged_data.groupby(['ClientRecord id', 'Email Address', 'First Name', 'Last Name']).agg(
trans_total=('Transaction Amount', 'sum'),
trans_cnt=('Transaction Amount', 'count'),
trans_mean=('Transaction Amount', 'mean'),
trans_med=('Transaction Amount', 'median'),
trans_std=('Transaction Amount', 'std'),
trans_var=('Transaction Amount', 'var')
).reset_index()
else:
output_data = merged_data.groupby(['ClientRecord id', 'Email Address', 'First Name', 'Last Name']).agg(
trans_total=('Transaction Amount', 'sum'),
trans_cnt=('Transaction Amount', 'count')
).reset_index()
# Sort output by transaction total amount, largest first
output_data.sort_values(by='trans_total', ascending=False, inplace=True)
# Add a totals row if requested
if args.totals:
totals_row = {
'ClientRecord id': 'TOTAL',
'Email Address': f"{len(df_flagged_emails)} emails, {len(flagged_clients)} client records" if args.totals else '',
'First Name': '',
'Last Name': '',
'trans_total': output_data['trans_total'].sum(),
'trans_cnt': output_data['trans_cnt'].sum()
}
if args.stats:
totals_row.update({
'trans_mean': '',
'trans_med': '',
'trans_std': '',
'trans_var': ''
})
output_data = pd.concat([output_data, pd.DataFrame([totals_row])], ignore_index=True)
logger.info("Totals row added to output.")
# Save to CSV
output_path = args.output
output_data.to_csv(output_path, index=False)
logger.info(f"Output data written to {output_path}")
logger.debug(f"Output Data Head:\n{output_data.head()}\n")
# Informational output to console
print(Fore.BLUE + Style.BRIGHT + "--- Summary of Actions ---")
print(Fore.CYAN + f"Flagged Emails Processed: {len(df_flagged_emails)}")
print(Fore.CYAN + f"Clients Matching Flagged Emails: {len(flagged_clients)}")
print("✅" + Fore.GREEN + Style.BRIGHT + f" Output Written to: {os.path.abspath(output_path)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment