Skip to content

Instantly share code, notes, and snippets.

@RobSpectre
Last active February 22, 2025 00:02
Show Gist options
  • Save RobSpectre/c23107a6ce30a302074328c5782b5fdd to your computer and use it in GitHub Desktop.
Save RobSpectre/c23107a6ce30a302074328c5782b5fdd to your computer and use it in GitHub Desktop.
import firebase_admin
from firebase_admin import firestore, storage
from flask import abort
from google.cloud.firestore_v1.base_document import DocumentSnapshot
import app.config
firebase_app = firebase_admin.initialize_app(options={"storageBucket": app.config.FIREBASE_STORAGE_BUCKET})
bucket = storage.bucket()
db = firestore.client()
class Process:
def __init__(self, id: str) -> None:
self.public_ref = db.collection("processes").document(id)
self.private_ref = self.public_ref.collection("private").document(id)
def get_public(self) -> DocumentSnapshot:
process = self.public_ref.get()
if not process.exists:
print("Document does not exist in Firestore")
abort(404, description="Document does not exist in firestore.")
return process
def get_private(self) -> DocumentSnapshot:
process = self.public_ref.get()
if not process.exists:
print("Document does not exist in Firestore")
abort(404, description="Document does not exist in firestore.")
return process
def download_blob(source_blob_name, destination_file_name):
"""Downloads a blob from the bucket."""
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
def upload_blob(source_file_name, destination_blob_name, user_id):
"""Uploads a file to the bucket."""
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
blob.metadata = { "userId": user_id }
blob.patch()
import os
import shutil
import logging
import time
from datetime import datetime, timezone
from functools import wraps
from flask import abort, request, jsonify
from app import TEMP_DIR, app
from app.generators.form_8949_generator import generate_form_8949
from app.generators.form_schedule_d_generator import generate_form_schedule_d
from app.models import User
from app.parsing import parse_csv
from app.services.backup import write_to_backup
from app.services.firebase import Process, download_blob, upload_blob
# Set up logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def cleanup_process_dir(f):
@wraps(f)
def decorated_function(*args, **kwargs):
process_id = request.args.get("id")
if process_id is None:
logger.error("Request did not include 'id' in call")
return jsonify({"error": "Missing required 'id' parameter"}), 427
PROCESS_DIR = os.path.join(TEMP_DIR, process_id)
os.makedirs(PROCESS_DIR, exist_ok=True)
try:
return f(*args, **kwargs)
finally:
if os.path.exists(PROCESS_DIR):
try:
shutil.rmtree(PROCESS_DIR)
except PermissionError as e:
logger.error(f"Could not remove directory {PROCESS_DIR}: {e}")
return decorated_function
@app.route("/form/process")
@cleanup_process_dir
def process_form():
process_id = request.args.get("id")
PROCESS_DIR = os.path.join(TEMP_DIR, process_id)
try:
# Retrieve Firestore document
process = Process(process_id)
public_doc = process.get_public()
private_doc = process.get_private()
if not public_doc.exists:
logger.error(f"Public document not found: {process_id}")
return jsonify({"error": "Process not found"}), 404
if not private_doc.exists:
logger.error(f"Private document not found: {process_id}")
return jsonify({"error": "Private data not found"}), 404
# Get user data from private collection
private_data = private_doc.to_dict()
public_data = public_doc.to_dict()
user = User(
name=private_data.get("name"), # Get from private collection
ssn=private_data.get("ssn"), # Get from private collection
id=public_data.get("userId"), # Get from public document
date=datetime.now(timezone.utc).strftime("%Y-%m-%d")
)
# Download and backup spreadsheet
filepath = os.path.join(PROCESS_DIR, "spreadsheet.csv")
download_blob(f"uploads/{process_id}/{user.id}/{user.date}/spreadsheet.csv", filepath)
write_to_backup(filepath)
# Update Firestore with progress
process.public_ref.set({"progress": "Generating your tax documents"}, merge=True)
# Parse transactions
transactions = parse_csv(filepath)
if transactions.invalid_transactions:
error_messages = [inv.reason for inv in transactions.invalid_transactions]
unique_errors = list(set(error_messages)) # Remove duplicates
formatted_errors = '; '.join(unique_errors)
logger.error(f"Invalid transactions detected: {formatted_errors}")
return jsonify({"error": f"Invalid transactions detected: {formatted_errors}"}), 400
# Generate Forms
form8949_result = generate_form_8949(transactions.transactions, PROCESS_DIR, name=user.name, ssn=user.ssn)
form_schedule_d_local_path = generate_form_schedule_d(
transactions.transactions, PROCESS_DIR, name=user.name, ssn=user.ssn
)
# Upload forms to Firebase Storage
form8949_path = f"generated/{process_id}/{user.id}/public-{user.date}/8949.pdf"
form8949_preview_path = f"generated/{process_id}/{user.id}/public-{user.date}/8949_preview.pdf"
form_schedule_d_path = f"generated/{process_id}/{user.id}/public-{user.date}/scheduled.pdf"
upload_blob(form8949_result["full_form_path"], form8949_path, user.id)
upload_blob(form8949_result["preview_form_path"], form8949_preview_path, user.id)
upload_blob(form_schedule_d_local_path, form_schedule_d_path, user.id)
#add this step evenutally for ADs. It delays the process by 20 seconds, so users can view the ad or progress bar.
#time.sleep(20)
# Update Firestore with form paths and status
process.public_ref.set(
{"progress": "PDF Generated", "status": "complete", "preview_path": form8949_preview_path}, merge=True
)
process.private_ref.set({"form_path": form8949_path, "schedule_d_path": form_schedule_d_path, "paid": False})
# Return success response
return jsonify({"message": "Form processing completed successfully"}), 200
except Exception as e:
logger.error(f"Error processing form: {str(e)}", exc_info=True)
return jsonify({"error": "Internal server error"}), 500
rules_version = '2';
service firebase.storage {
match /b/{bucket}/o {
// Default deny
match /{allPaths=**} {
allow read, write: if false;
}
// Helper functions
function isValidUser(userId) {
return request.auth != null && request.auth.uid == request.resource.metadata.userId;
}
function isValidFileSize() {
return request.resource.size <= 7 * 1024 * 1024; // 7MB limit
}
// Allow access to uploaded files (restricted to the uploader)
match /uploads/{processId}/{userId}/{allPaths=**} {
allow write: if isValidFileSize();
allow read: if false; // No one can read the uploads except the uploader
}
// Allow access to generated files, but only to the uploader
match /generated/{processId}/{userId}/{allPaths=**} {
allow read: if isValidUser(userId); // Only the uploader can read
allow write: if false; // No one can write to the generated files
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment