Created
April 29, 2025 14:51
-
-
Save ibuilder/980b2b104762ffc17de5f8a7ee12d98d to your computer and use it in GitHub Desktop.
Python script to pull Procore API Budget information and create an AIA G702/G703 into Excel.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import pandas as pd | |
import openpyxl | |
from openpyxl.styles import Font, Border, Side, Alignment, PatternFill | |
from openpyxl.drawing.image import Image | |
from datetime import datetime | |
import os | |
import json | |
class ProcoreAPIClient: | |
def __init__(self, client_id, client_secret, redirect_uri, company_id): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.redirect_uri = redirect_uri | |
self.company_id = company_id | |
self.base_url = "https://api.procore.com" | |
self.token = None | |
def authenticate(self, auth_code=None): | |
"""Authenticate with Procore API using OAuth2""" | |
if auth_code: | |
# Initial authentication with authorization code | |
token_url = f"{self.base_url}/oauth/token" | |
payload = { | |
"client_id": self.client_id, | |
"client_secret": self.client_secret, | |
"grant_type": "authorization_code", | |
"code": auth_code, | |
"redirect_uri": self.redirect_uri | |
} | |
response = requests.post(token_url, data=payload) | |
if response.status_code == 200: | |
self.token = response.json() | |
return True | |
else: | |
print(f"Authentication failed: {response.text}") | |
return False | |
else: | |
# Refresh token if already authenticated before | |
if not self.token: | |
print("No authorization code or existing token found.") | |
return False | |
token_url = f"{self.base_url}/oauth/token" | |
payload = { | |
"client_id": self.client_id, | |
"client_secret": self.client_secret, | |
"grant_type": "refresh_token", | |
"refresh_token": self.token["refresh_token"] | |
} | |
response = requests.post(token_url, data=payload) | |
if response.status_code == 200: | |
self.token = response.json() | |
return True | |
else: | |
print(f"Token refresh failed: {response.text}") | |
return False | |
def get_headers(self): | |
"""Get authorization headers for API requests""" | |
if not self.token: | |
raise Exception("Not authenticated. Call authenticate() first.") | |
return { | |
"Authorization": f"Bearer {self.token['access_token']}", | |
"Content-Type": "application/json" | |
} | |
def get_projects(self): | |
"""Get list of projects for the company""" | |
url = f"{self.base_url}/rest/v1.0/projects" | |
params = {"company_id": self.company_id} | |
response = requests.get(url, headers=self.get_headers(), params=params) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Failed to get projects: {response.text}") | |
return None | |
def get_budget_data(self, project_id): | |
"""Get budget data for a project""" | |
url = f"{self.base_url}/rest/v1.0/projects/{project_id}/budget" | |
response = requests.get(url, headers=self.get_headers()) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Failed to get budget data: {response.text}") | |
return None | |
def get_budget_line_items(self, project_id): | |
"""Get budget line items for a project""" | |
url = f"{self.base_url}/rest/v1.0/projects/{project_id}/budget/line_items" | |
response = requests.get(url, headers=self.get_headers()) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Failed to get budget line items: {response.text}") | |
return None | |
def get_contracts(self, project_id): | |
"""Get contracts for a project""" | |
url = f"{self.base_url}/rest/v1.0/projects/{project_id}/contracts" | |
response = requests.get(url, headers=self.get_headers()) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Failed to get contracts: {response.text}") | |
return None | |
def get_contract_line_items(self, project_id, contract_id): | |
"""Get line items for a specific contract""" | |
url = f"{self.base_url}/rest/v1.0/projects/{project_id}/contracts/{contract_id}/line_items" | |
response = requests.get(url, headers=self.get_headers()) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Failed to get contract line items: {response.text}") | |
return None | |
class AIAGenerator: | |
def __init__(self, procore_client): | |
self.procore = procore_client | |
self.current_date = datetime.now().strftime("%m/%d/%Y") | |
def get_project_data(self, project_id): | |
"""Get all necessary data for generating AIA G702/G703""" | |
# Get basic project info | |
projects = self.procore.get_projects() | |
project = next((p for p in projects if p["id"] == project_id), None) | |
if not project: | |
raise Exception(f"Project with ID {project_id} not found") | |
# Get budget line items | |
budget_items = self.procore.get_budget_line_items(project_id) | |
# Get contracts | |
contracts = self.procore.get_contracts(project_id) | |
# Combine data into a structure useful for G702/G703 | |
return { | |
"project": project, | |
"budget_items": budget_items, | |
"contracts": contracts | |
} | |
def create_g702(self, wb, project_data, contract_id=None): | |
"""Create G702 Application and Certificate for Payment sheet""" | |
# Create a new worksheet | |
ws = wb.create_sheet("G702") | |
# Get contract data if contract_id is provided | |
contract = None | |
if contract_id: | |
contracts = project_data["contracts"] | |
contract = next((c for c in contracts if c["id"] == contract_id), None) | |
if not contract: | |
raise Exception(f"Contract with ID {contract_id} not found") | |
# Basic formatting | |
ws.merge_cells('A1:H1') | |
ws['A1'] = "APPLICATION AND CERTIFICATE FOR PAYMENT - AIA DOCUMENT G702" | |
ws['A1'].font = Font(bold=True, size=14) | |
ws['A1'].alignment = Alignment(horizontal='center') | |
# Project information section | |
ws['A3'] = "TO OWNER:" | |
ws['A5'] = "PROJECT:" | |
ws['A7'] = "FROM CONTRACTOR:" | |
ws['A10'] = "CONTRACT FOR:" | |
ws['A12'] = "CONTRACT DATE:" | |
# Set project info if available | |
project = project_data["project"] | |
if project: | |
ws['C5'] = project["name"] | |
ws['C6'] = project.get("address", "") | |
if contract: | |
ws['C10'] = contract.get("title", "") | |
ws['C12'] = contract.get("created_at", "")[:10] # Extract date part only | |
# Application information | |
ws['E3'] = "APPLICATION NO:" | |
ws['E4'] = "PERIOD TO:" | |
ws['E5'] = "PROJECT NOS:" | |
ws['G3'] = "001" # Example application number | |
ws['G4'] = self.current_date | |
# Contract summary section | |
ws['A15'] = "CONTRACT SUMMARY" | |
ws['A15'].font = Font(bold=True) | |
headers = ["", "PREVIOUS", "THIS PERIOD", "TOTAL", "% COMPLETE"] | |
for i, header in enumerate(headers): | |
col = chr(65 + i) # A, B, C, D, E | |
ws[f'{col}16'] = header | |
ws[f'{col}16'].font = Font(bold=True) | |
# Contract line items | |
rows = [ | |
["1. ORIGINAL CONTRACT SUM", "", "", "$0.00", ""], | |
["2. NET CHANGE BY CHANGE ORDERS", "", "", "$0.00", ""], | |
["3. CONTRACT SUM TO DATE", "", "", "$0.00", ""], | |
["4. TOTAL COMPLETED & STORED TO DATE", "", "", "$0.00", ""], | |
["5. RETAINAGE", "", "", "$0.00", ""], | |
["6. TOTAL EARNED LESS RETAINAGE", "", "", "$0.00", ""], | |
["7. LESS PREVIOUS CERTIFICATES FOR PAYMENT", "", "", "$0.00", ""], | |
["8. CURRENT PAYMENT DUE", "", "", "$0.00", ""], | |
["9. BALANCE TO FINISH, INCLUDING RETAINAGE", "", "", "$0.00", ""] | |
] | |
for i, row_data in enumerate(rows): | |
row = 17 + i | |
for j, value in enumerate(row_data): | |
col = chr(65 + j) # A, B, C, D, E | |
ws[f'{col}{row}'] = value | |
if j == 0: | |
ws[f'{col}{row}'].font = Font(bold=True) | |
# Signature section | |
ws['A30'] = "ARCHITECT'S CERTIFICATE FOR PAYMENT" | |
ws['A30'].font = Font(bold=True) | |
ws['A32'] = "AMOUNT CERTIFIED:" | |
ws['A34'] = "ARCHITECT:" | |
ws['A36'] = "By:" | |
ws['A38'] = "Date:" | |
# Set column widths | |
for col in ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']: | |
ws.column_dimensions[col].width = 15 | |
return ws | |
def create_g703(self, wb, project_data, contract_id=None): | |
"""Create G703 Continuation Sheet""" | |
# Create a new worksheet | |
ws = wb.create_sheet("G703") | |
# Get contract line items if contract_id is provided | |
contract_items = [] | |
if contract_id: | |
contract_items = self.procore.get_contract_line_items( | |
project_data["project"]["id"], | |
contract_id | |
) | |
# Basic formatting | |
ws.merge_cells('A1:I1') | |
ws['A1'] = "CONTINUATION SHEET - AIA DOCUMENT G703" | |
ws['A1'].font = Font(bold=True, size=14) | |
ws['A1'].alignment = Alignment(horizontal='center') | |
# Header information | |
ws['A3'] = "PROJECT:" | |
project = project_data["project"] | |
if project: | |
ws['B3'] = project["name"] | |
ws['F3'] = "APPLICATION NO:" | |
ws['G3'] = "001" # Example application number | |
ws['F4'] = "APPLICATION DATE:" | |
ws['G4'] = self.current_date | |
ws['F5'] = "PERIOD TO:" | |
ws['G5'] = self.current_date | |
# Table headers | |
headers = [ | |
"A", "B", "C", "D", "E", "F", "G", "H", "I" | |
] | |
headers_text = [ | |
"ITEM\nNO.", "DESCRIPTION OF WORK", "SCHEDULED\nVALUE", | |
"WORK COMPLETED", "", "MATERIALS\nPRESENTLY\nSTORED", | |
"TOTAL\nCOMPLETED\nAND STORED\nTO DATE\n(D+E+F)", | |
"%\n(G÷C)", "BALANCE\nTO FINISH\n(C-G)" | |
] | |
ws.merge_cells('D6:E6') | |
ws['D6'] = "WORK COMPLETED" | |
ws['D6'].font = Font(bold=True) | |
ws['D6'].alignment = Alignment(horizontal='center') | |
headers_row = 7 | |
ws['D7'] = "FROM PREVIOUS\nAPPLICATION" | |
ws['E7'] = "THIS PERIOD" | |
for i, (col, text) in enumerate(zip(headers, headers_text)): | |
ws[f'{col}{headers_row}'] = text | |
ws[f'{col}{headers_row}'].font = Font(bold=True) | |
ws[f'{col}{headers_row}'].alignment = Alignment(wrap_text=True, horizontal='center', vertical='center') | |
# Set column widths | |
column_widths = [10, 30, 15, 15, 15, 15, 15, 10, 15] | |
for col, width in zip(headers, column_widths): | |
ws.column_dimensions[col].width = width | |
# Add line items if available | |
start_row = 8 | |
if contract_items: | |
for i, item in enumerate(contract_items): | |
row = start_row + i | |
ws[f'A{row}'] = i + 1 | |
ws[f'B{row}'] = item.get("description", "") | |
ws[f'C{row}'] = item.get("original_value", 0) | |
ws[f'D{row}'] = 0 # Previous application | |
ws[f'E{row}'] = 0 # This period | |
ws[f'F{row}'] = 0 # Materials stored | |
# Formulas | |
ws[f'G{row}'] = f"=SUM(D{row}:F{row})" | |
ws[f'H{row}'] = f"=IF(C{row}>0,G{row}/C{row},0)" | |
ws.cell(row=row, column=8).number_format = '0.00%' | |
ws[f'I{row}'] = f"=C{row}-G{row}" | |
else: | |
# Add some empty rows if no line items | |
for i in range(10): | |
row = start_row + i | |
ws[f'A{row}'] = i + 1 | |
ws[f'G{row}'] = f"=SUM(D{row}:F{row})" | |
ws[f'H{row}'] = f"=IF(C{row}>0,G{row}/C{row},0)" | |
ws.cell(row=row, column=8).number_format = '0.00%' | |
ws[f'I{row}'] = f"=C{row}-G{row}" | |
# Add totals row | |
total_row = start_row + (len(contract_items) if contract_items else 10) + 1 | |
ws[f'A{total_row}'] = "GRAND TOTAL" | |
ws[f'A{total_row}'].font = Font(bold=True) | |
# Add sum formulas for numeric columns | |
for col in ['C', 'D', 'E', 'F', 'G', 'I']: | |
first_data_row = start_row | |
last_data_row = total_row - 1 | |
ws[f'{col}{total_row}'] = f"=SUM({col}{first_data_row}:{col}{last_data_row})" | |
ws[f'{col}{total_row}'].font = Font(bold=True) | |
# Calculate percentage complete | |
ws[f'H{total_row}'] = f"=IF(C{total_row}>0,G{total_row}/C{total_row},0)" | |
ws.cell(row=total_row, column=8).number_format = '0.00%' | |
ws[f'H{total_row}'].font = Font(bold=True) | |
return ws | |
def generate_aia_invoice(self, project_id, contract_id=None, output_file="AIA_G702_G703_Invoice.xlsx"): | |
"""Generate a complete AIA G702/G703 invoice workbook""" | |
# Get project data | |
project_data = self.get_project_data(project_id) | |
# Create workbook | |
wb = openpyxl.Workbook() | |
# Remove default sheet | |
default_sheet = wb.active | |
wb.remove(default_sheet) | |
# Create G702 and G703 sheets | |
self.create_g702(wb, project_data, contract_id) | |
self.create_g703(wb, project_data, contract_id) | |
# Save workbook | |
wb.save(output_file) | |
print(f"Invoice saved to {output_file}") | |
return output_file | |
def main(): | |
# Configuration - replace with actual values or load from config file | |
config = { | |
"client_id": "YOUR_CLIENT_ID", | |
"client_secret": "YOUR_CLIENT_SECRET", | |
"redirect_uri": "YOUR_REDIRECT_URI", | |
"company_id": "YOUR_COMPANY_ID" | |
} | |
# Initialize Procore client | |
procore_client = ProcoreAPIClient( | |
config["client_id"], | |
config["client_secret"], | |
config["redirect_uri"], | |
config["company_id"] | |
) | |
# Authorization flow - you need to get an auth code from the Procore OAuth flow | |
print("Visit the following URL to get an authorization code:") | |
print(f"https://login.procore.com/oauth/authorize?response_type=code&client_id={config['client_id']}&redirect_uri={config['redirect_uri']}") | |
auth_code = input("Enter the authorization code from the URL: ") | |
# Authenticate | |
if procore_client.authenticate(auth_code): | |
print("Authentication successful!") | |
# List projects | |
projects = procore_client.get_projects() | |
if projects: | |
print("\nAvailable Projects:") | |
for i, project in enumerate(projects): | |
print(f"{i+1}. {project['name']} (ID: {project['id']})") | |
# Select a project | |
project_index = int(input("\nSelect a project number: ")) - 1 | |
if 0 <= project_index < len(projects): | |
project_id = projects[project_index]["id"] | |
# Get contracts for the project | |
contracts = procore_client.get_contracts(project_id) | |
contract_id = None | |
if contracts: | |
print("\nAvailable Contracts:") | |
for i, contract in enumerate(contracts): | |
print(f"{i+1}. {contract.get('title', 'Untitled')} (ID: {contract['id']})") | |
# Select a contract | |
contract_index = int(input("\nSelect a contract number (or 0 to skip): ")) - 1 | |
if 0 <= contract_index < len(contracts): | |
contract_id = contracts[contract_index]["id"] | |
# Generate AIA invoice | |
aia_generator = AIAGenerator(procore_client) | |
output_file = aia_generator.generate_aia_invoice( | |
project_id, | |
contract_id, | |
f"AIA_G702_G703_Invoice_{datetime.now().strftime('%Y%m%d')}.xlsx" | |
) | |
print(f"\nInvoice generated successfully: {output_file}") | |
else: | |
print("Invalid project selection") | |
else: | |
print("No projects found") | |
else: | |
print("Authentication failed") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment