Skip to content

Instantly share code, notes, and snippets.

@habedi
Last active September 2, 2024 15:52
Show Gist options
  • Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.
Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.
A Python script to download data in Google Spreadsheet files stored in Google Drive #python #google_drive #CSV
import argparse
import json
import re
from pathlib import Path
import gspread
import pandas as pd
from getfilelistpy import getfilelist
from google.oauth2 import service_account
from tqdm import tqdm
def str2bool(v):
"""Convert a string representation of truth to true or false.
Args:
v (str): The string to convert.
Returns:
bool: The boolean value corresponding to the string.
Raises:
argparse.ArgumentTypeError: If the string is not a valid boolean representation.
"""
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
class DuplicateHeaderError(Exception):
"""Custom exception for duplicate headers after normalization."""
def __init__(self, message):
"""Initialize the DuplicateHeaderError with a message.
Args:
message (str): The error message.
"""
self.message = message
super().__init__(self.message)
class HeadersNormalizer:
"""Normalize the headers of a DataFrame by replacing spaces with underscores and converting to lowercase."""
def __init__(self, dataframe: pd.DataFrame, handle_duplicates='suffix'):
"""Initialize the HeadersNormalizer with a DataFrame and a method to handle duplicates.
Args:
dataframe (pd.DataFrame): The DataFrame whose headers are to be normalized.
handle_duplicates (str): Method to handle duplicate headers ('raise' or 'suffix').
"""
self.dataframe = dataframe
self.original_headers = dataframe.columns.tolist()
self.handle_duplicates = handle_duplicates
self.normalized_headers = self._normalize_headers()
self._check_for_duplicates()
def _normalize_headers(self):
"""Normalize the headers of the DataFrame.
Returns:
list: The normalized headers.
"""
return [self.normalize_header(header) for header in self.original_headers]
def normalize_header(self, header):
"""Normalize a single header.
Args:
header (str): The header to normalize.
Returns:
str: The normalized header.
"""
# Replace spaces with underscores, convert to lowercase, and remove invalid characters
normalized = header.strip().replace(' ', '_').lower()
# Remove invalid characters (keep only letters, digits, and underscores)
normalized = re.sub(r'[^a-zA-Z0-9_]', '', normalized)
return normalized
def _check_for_duplicates(self):
"""Check for duplicate normalized headers and handle them based on the specified method."""
if len(set(self.normalized_headers)) != len(self.normalized_headers):
if self.handle_duplicates == 'raise':
duplicates = [header for header in set(self.normalized_headers) if
self.normalized_headers.count(header) > 1]
raise DuplicateHeaderError(f"Duplicate normalized headers found: {duplicates}")
elif self.handle_duplicates == 'suffix':
self._resolve_duplicates()
def _resolve_duplicates(self):
"""Add a suffix to the duplicate normalized headers."""
header_count = {}
for i, header in enumerate(self.normalized_headers):
if header not in header_count:
header_count[header] = 1
else:
header_count[header] += 1
self.normalized_headers[i] = f"{header}_{header_count[header]}"
def get_original_headers(self):
"""Return the original headers of the DataFrame.
Returns:
list: The original headers.
"""
return self.original_headers
def get_normalized_headers(self):
"""Return the normalized headers of the DataFrame.
Returns:
list: The normalized headers.
"""
return self.normalized_headers
class DataDownloader:
"""A class to download data from Google Drive using the Google Drive API."""
def __init__(self, credentials_file, top_folder_id):
"""Initialize the DataDownloader class with credentials and top folder ID.
Args:
credentials_file (str): Path to the Google service account credentials file.
top_folder_id (str): The ID of the top folder in Google Drive.
"""
self.credentials = service_account.Credentials.from_service_account_file(
credentials_file, scopes=['https://www.googleapis.com/auth/drive']
)
self.credentials_file = credentials_file
self.top_folder_id = top_folder_id
def _create_resource(self):
"""Create the resource to get the file list from Google Drive.
Returns:
dict: The resource dictionary for the Google Drive API.
"""
return {
"service_account": self.credentials,
"id": self.top_folder_id,
"fields": "files(name, id, mimeType, parents, createdTime, modifiedTime)",
}
@staticmethod
def _filter_google_sheets_files(folder_contents):
"""Filter the Google Sheets files from the folder contents.
Args:
folder_contents (dict): The contents of the folder from Google Drive.
Returns:
list: A list of Google Sheets files.
"""
contents_list = []
for item in folder_contents['fileList']:
for file in item['files']:
if file['mimeType'] == 'application/vnd.google-apps.spreadsheet':
contents_list.append(file)
return contents_list
def _scan_folder(self):
"""Scan the folder and get the Google Sheets files.
Returns:
list: A list of Google Sheets files in the folder.
"""
resource = self._create_resource()
folder_contents = getfilelist.GetFileList(resource)
return self._filter_google_sheets_files(folder_contents)
def scan_drive(self):
"""Scan the Google Drive and get the Google Sheets files.
Returns:
list: A list of Google Sheets files in the Google Drive.
"""
return self._scan_folder()
@staticmethod
def _write_to_json_file(result, output_file):
"""Write the result to a JSON file.
Args:
result (list): The result to be written to the JSON file.
output_file (str): The path to the output JSON file.
"""
output_file = Path(output_file)
if not output_file.parent.exists():
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w') as f:
json.dump(result, f, indent=4)
def scan_drive_to_json(self, output_file):
"""Scan the Google Drive and write the result to a JSON file.
Args:
output_file (str): The path to the output JSON file.
"""
result = self.scan_drive()
self._write_to_json_file(result, output_file)
@staticmethod
def create_dataframe(data, skip_empty_headers=True):
"""Create a pandas DataFrame from the data extracted from the Google Sheet.
Args:
data (list): The data from the Google Sheet.
skip_empty_headers (bool): Whether to skip columns with empty headers.
Returns:
pd.DataFrame: The resulting pandas DataFrame.
"""
if skip_empty_headers:
# Filter out columns with empty string headers
header_indices = [i for i, c in enumerate(data[0]) if c != '']
else:
header_indices = range(len(data[0]))
# Extract data for the selected columns
new_data = [[row[i] for i in header_indices] for row in data[1:]]
# Convert to DataFrame
df = pd.DataFrame(new_data, columns=[data[0][i] for i in header_indices])
return df
@staticmethod
def sanitize_filename(filename: str) -> str:
"""Sanitize the filename by removing invalid characters.
Args:
filename (str): The original filename.
Returns:
str: The sanitized filename.
"""
# Remove leading and trailing whitespaces from the filename
filename = filename.strip()
# Remove invalid characters (?, :, /, \, *, ", <, >, |, &, . and space) from the filename
return re.sub(r'[?/:\\*"<>&.\s]', '_', filename)
def download_sheet_data(self, file_info, output_dir, sheet_id=0):
"""Download the data from the Google Sheet to a CSV file.
Args:
file_info (dict): Information about the file to be downloaded.
output_dir (str): The directory where the downloaded data will be stored.
sheet_id (int): The index of the sheet to download from the Google Sheet file.
"""
# Use the credentials to authorize the Google Sheets API
gc = gspread.service_account(filename=self.credentials_file)
# Access the Google Sheet
sheet = gc.open_by_key(file_info['id'])
# Download data from the Google Sheet
sheet_data = sheet.get_worksheet(sheet_id).get_all_values()
# print(sheet_data[:5])
# Sanitize the filename and create the output file
sanitized_filename = self.sanitize_filename(file_info['name'])
output_file = Path(output_dir) / (sanitized_filename + '_' + str(sheet_id) + '.csv')
output_file.parent.mkdir(parents=True, exist_ok=True)
# Use pandas to write the data to a CSV file
sheet_data_df = self.create_dataframe(sheet_data)
# Normalize the headers
headers_normalizer = HeadersNormalizer(sheet_data_df)
sheet_data_df.columns = headers_normalizer.get_normalized_headers()
# Write the data to a CSV file
sheet_data_df.to_csv(output_file, index=False)
def load_exclusion_patterns(exclusion_patterns_file):
"""Load the exclusion patterns from a JSON file.
Args:
exclusion_patterns_file (str): Path to the exclusion patterns JSON file.
Returns:
dict: The loaded exclusion patterns.
"""
with open(exclusion_patterns_file) as f:
exclusion_patterns = json.load(f)
return exclusion_patterns
def load_categories(categories_file):
"""Load the categories from a JSON file.
Args:
categories_file (str): Path to the categories JSON file.
Returns:
dict: The loaded categories.
"""
with open(categories_file) as f:
categories = json.load(f)
return categories
def main():
"""Download data from Google Drive using the Google Drive API."""
# Create an argument parser to get the credentials file and the top folder id
parser = argparse.ArgumentParser(description="Download data from Google Drive")
# Arguments for the parser
parser.add_argument('--credentials_file', type=str, default='secrets/credentials.json',
help='The credentials file for the Google Drive API')
parser.add_argument('--top_folder_id', type=str, default='1X8WNjmAzcPoe8xnCDXxrHmFoYWE1ToYz',
help='The id of the top folder in the Google Drive')
parser.add_argument('--output_dir', type=str, default='/tmp',
help='The directory where the downloaded data will be stored')
parser.add_argument('--sheet_index', type=int, default=1,
help='The index of the sheet to download from the Google Sheet file')
parser.add_argument('--organise_by_prefix', type=str2bool, default=True,
help='Organise the downloaded files by their prefix into folders')
parser.add_argument('--exclusion_patterns_file', type=str, default='data/conf/exclusion_patterns.json',
help='The file containing the exclusion patterns to ignore files using regex')
parser.add_argument('--categories_file', type=str, default='data/conf/categories.json',
help='The file containing the categories to organize the files')
# Parse the arguments from the command line
args = parser.parse_args()
# Load the exclusion patterns from the JSON file
exclusion_patterns = load_exclusion_patterns(args.exclusion_patterns_file)
# Create a DataDownloader object and scan the Google Drive to get the Google Sheets files and download the data
scanner = DataDownloader(args.credentials_file, args.top_folder_id)
scanner.scan_drive_to_json(args.output_dir + '/drive_contents.json')
# Load the categories from the JSON file
categories = load_categories(args.categories_file)
# Iterate over the files and download the data from the Google Sheets files
for file in tqdm(scanner.scan_drive()):
if any([pattern in file['name'] for pattern in exclusion_patterns]):
print(
f"Not downloading any data from {file['name']} based on the exclusion patterns in {args.exclusion_patterns_file}")
continue
try:
if args.organise_by_prefix and '_' in file['name'] and file[
'mimeType'] == 'application/vnd.google-apps.spreadsheet':
# Find the prefix for the file based on the categories in the JSON file and create a directory for it
prefix = None
for c in categories:
if file['name'].lower().startswith(c.lower()):
prefix = c.lower()
# Exit the loop if a prefix is found for the file
break
if prefix is None:
print(f"Could not find a category for file {file['name']}")
raise ValueError(f"Could not find a category for file {file['name']}")
# Create a directory for the prefix if it does not exist
storage_dir = Path(args.output_dir) / prefix
if not storage_dir.exists():
storage_dir.mkdir(parents=True, exist_ok=True)
scanner.download_sheet_data(file, storage_dir, args.sheet_index)
else:
scanner.download_sheet_data(file, args.output_dir, args.sheet_index)
except Exception as e:
print(f"Error downloading sheet {args.sheet_index} data from file {file['name']}: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment