Created
January 21, 2025 17:31
-
-
Save tbbooher/871935cd19ae2a198d5ccfc0f7ec9313 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Filename: shipment_summary.py | |
import os | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
import psycopg2 | |
import re | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from re import search | |
import csv | |
# Load environment variables from ../env file | |
load_dotenv('/Users/tim/code/finance_automator/.env') | |
def connect_to_google_sheets(): | |
scopes = ['https://www.googleapis.com/auth/spreadsheets'] | |
service_account_file = 'mycreds.json' | |
creds = service_account.Credentials.from_service_account_file( | |
service_account_file, scopes=scopes | |
) | |
return build('sheets', 'v4', credentials=creds) | |
def get_google_sheet_data(service, spreadsheet_id, sheet_name): | |
result = service.spreadsheets().values().get( | |
spreadsheetId=spreadsheet_id, | |
range=f"{sheet_name}!A:K" | |
).execute() | |
rows = result.get("values", []) | |
# Filter rows where column C has the case insensitive word "Amazon" | |
# and column K is not empty (assuming col K has the amounts or something) | |
filtered_rows = [ | |
row for row in rows | |
if len(row) == 10 and re.search(r'amazon', row[2], re.IGNORECASE) | |
] | |
return filtered_rows | |
def find_matching_shipments(conn, date_str, amount): | |
""" | |
Find shipments in the database that match the given date and amount, | |
allowing +/- 5 days around date_str. | |
If your DB amounts are positive but your spreadsheet or transaction | |
amounts are negative, we multiply by -1 here. | |
""" | |
n_days = 5 | |
query = """ | |
SELECT invoice_date, invoice_amount, concatenated_product_names | |
FROM all_invoice_order_data | |
WHERE invoice_date BETWEEN %s AND %s | |
AND invoice_amount = %s | |
""" | |
date_obj = datetime.strptime(date_str, "%Y-%m-%d") | |
start_date = (date_obj - timedelta(days=n_days)).strftime("%Y-%m-%d") | |
end_date = (date_obj + timedelta(days=n_days)).strftime("%Y-%m-%d") | |
with conn.cursor() as cursor: | |
cursor.execute(query, (start_date, end_date, amount)) | |
results = cursor.fetchall() | |
return results | |
def parse_amount(value): | |
if not value: | |
return 0.0 | |
# Remove commas and "$" from amounts, handle negative if present | |
value = value.replace(",", "").replace("$", "") | |
return float(value) | |
def parse_row(row): | |
if not row or len(row) < 4: | |
return None | |
date_str = row[0].strip() | |
try: | |
# Convert "12/28/2024" -> "2024-12-28" | |
date_obj = datetime.strptime(date_str, "%m/%d/%Y") | |
date_str = date_obj.strftime("%Y-%m-%d") | |
except ValueError: | |
return None | |
amount_str = row[3].replace("$", "").replace(",", "") | |
try: | |
amount = float(amount_str) | |
except ValueError: | |
return None | |
return (date_str, amount) | |
def process_sheet_rows(rows, conn): | |
# Overwrite the CSV file at the start | |
with open('shipment_matches.csv', mode='w', newline='') as file: | |
writer = csv.writer(file) | |
for i, row in enumerate(rows, start=1): | |
if not row or len(row) < 4: | |
print(f"Error processing row {i}: Empty or insufficient columns") | |
continue | |
try: | |
# Parse date | |
date_str = row[0].strip() | |
date_obj = datetime.strptime(date_str, "%m/%d/%Y") | |
date_str = date_obj.strftime("%Y-%m-%d") | |
# Parse amount | |
amt_str = row[3].replace("$", "").replace(",", "").strip() | |
amount = float(amt_str) * -1 | |
# Query shipment summary | |
matches = find_matching_shipments(conn, date_str, amount) | |
if matches: | |
with open('shipment_matches.csv', mode='a', newline='') as file: | |
writer = csv.writer(file) | |
for match in matches: | |
writer.writerow([date_str, amount, match[2]]) | |
print(f"Row {i} => MATCH found for date={date_str}, amount={amount}, products={match[2]}") | |
else: | |
print(f"Row {i} => No match found for date={date_str}, amount={amount}") | |
except ValueError as e: | |
print(f"Error parsing row {i} ({row}): {e}") | |
except Exception as e: | |
print(f"Unhandled error processing row {i} ({row}): {e}") | |
def main(): | |
# Google Sheets setup | |
service = connect_to_google_sheets() | |
spreadsheet_id = os.getenv("SPREADSHEET_ID") | |
sheet_name = os.getenv("SHEET_NAME") | |
sheet_data = get_google_sheet_data(service, spreadsheet_id, sheet_name) | |
# Database connection | |
conn = psycopg2.connect( | |
host=os.getenv("DATABASE_HOST"), | |
database='amzn', | |
user=os.getenv("DATABASE_USER"), | |
password=os.getenv("DATABASE_PASSWORD"), | |
port=os.getenv("DATABASE_PORT") # <-- unify with environment.py | |
) | |
try: | |
# Skip header row if your sheet has one | |
process_sheet_rows(sheet_data, conn) | |
except Exception as e: | |
print(f"Critical error in main: {e}") | |
finally: | |
conn.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment