Last active
September 2, 2024 15:52
-
-
Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.
A Python script to download data in Google Spreadsheet files stored in Google Drive #python #google_drive #CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
import re | |
from pathlib import Path | |
import gspread | |
import pandas as pd | |
from getfilelistpy import getfilelist | |
from google.oauth2 import service_account | |
from tqdm import tqdm | |
def str2bool(v): | |
"""Convert a string representation of truth to true or false. | |
Args: | |
v (str): The string to convert. | |
Returns: | |
bool: The boolean value corresponding to the string. | |
Raises: | |
argparse.ArgumentTypeError: If the string is not a valid boolean representation. | |
""" | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected.') | |
class DuplicateHeaderError(Exception): | |
"""Custom exception for duplicate headers after normalization.""" | |
def __init__(self, message): | |
"""Initialize the DuplicateHeaderError with a message. | |
Args: | |
message (str): The error message. | |
""" | |
self.message = message | |
super().__init__(self.message) | |
class HeadersNormalizer: | |
"""Normalize the headers of a DataFrame by replacing spaces with underscores and converting to lowercase.""" | |
def __init__(self, dataframe: pd.DataFrame, handle_duplicates='suffix'): | |
"""Initialize the HeadersNormalizer with a DataFrame and a method to handle duplicates. | |
Args: | |
dataframe (pd.DataFrame): The DataFrame whose headers are to be normalized. | |
handle_duplicates (str): Method to handle duplicate headers ('raise' or 'suffix'). | |
""" | |
self.dataframe = dataframe | |
self.original_headers = dataframe.columns.tolist() | |
self.handle_duplicates = handle_duplicates | |
self.normalized_headers = self._normalize_headers() | |
self._check_for_duplicates() | |
def _normalize_headers(self): | |
"""Normalize the headers of the DataFrame. | |
Returns: | |
list: The normalized headers. | |
""" | |
return [self.normalize_header(header) for header in self.original_headers] | |
def normalize_header(self, header): | |
"""Normalize a single header. | |
Args: | |
header (str): The header to normalize. | |
Returns: | |
str: The normalized header. | |
""" | |
# Replace spaces with underscores, convert to lowercase, and remove invalid characters | |
normalized = header.strip().replace(' ', '_').lower() | |
# Remove invalid characters (keep only letters, digits, and underscores) | |
normalized = re.sub(r'[^a-zA-Z0-9_]', '', normalized) | |
return normalized | |
def _check_for_duplicates(self): | |
"""Check for duplicate normalized headers and handle them based on the specified method.""" | |
if len(set(self.normalized_headers)) != len(self.normalized_headers): | |
if self.handle_duplicates == 'raise': | |
duplicates = [header for header in set(self.normalized_headers) if | |
self.normalized_headers.count(header) > 1] | |
raise DuplicateHeaderError(f"Duplicate normalized headers found: {duplicates}") | |
elif self.handle_duplicates == 'suffix': | |
self._resolve_duplicates() | |
def _resolve_duplicates(self): | |
"""Add a suffix to the duplicate normalized headers.""" | |
header_count = {} | |
for i, header in enumerate(self.normalized_headers): | |
if header not in header_count: | |
header_count[header] = 1 | |
else: | |
header_count[header] += 1 | |
self.normalized_headers[i] = f"{header}_{header_count[header]}" | |
def get_original_headers(self): | |
"""Return the original headers of the DataFrame. | |
Returns: | |
list: The original headers. | |
""" | |
return self.original_headers | |
def get_normalized_headers(self): | |
"""Return the normalized headers of the DataFrame. | |
Returns: | |
list: The normalized headers. | |
""" | |
return self.normalized_headers | |
class DataDownloader: | |
"""A class to download data from Google Drive using the Google Drive API.""" | |
def __init__(self, credentials_file, top_folder_id): | |
"""Initialize the DataDownloader class with credentials and top folder ID. | |
Args: | |
credentials_file (str): Path to the Google service account credentials file. | |
top_folder_id (str): The ID of the top folder in Google Drive. | |
""" | |
self.credentials = service_account.Credentials.from_service_account_file( | |
credentials_file, scopes=['https://www.googleapis.com/auth/drive'] | |
) | |
self.credentials_file = credentials_file | |
self.top_folder_id = top_folder_id | |
def _create_resource(self): | |
"""Create the resource to get the file list from Google Drive. | |
Returns: | |
dict: The resource dictionary for the Google Drive API. | |
""" | |
return { | |
"service_account": self.credentials, | |
"id": self.top_folder_id, | |
"fields": "files(name, id, mimeType, parents, createdTime, modifiedTime)", | |
} | |
@staticmethod | |
def _filter_google_sheets_files(folder_contents): | |
"""Filter the Google Sheets files from the folder contents. | |
Args: | |
folder_contents (dict): The contents of the folder from Google Drive. | |
Returns: | |
list: A list of Google Sheets files. | |
""" | |
contents_list = [] | |
for item in folder_contents['fileList']: | |
for file in item['files']: | |
if file['mimeType'] == 'application/vnd.google-apps.spreadsheet': | |
contents_list.append(file) | |
return contents_list | |
def _scan_folder(self): | |
"""Scan the folder and get the Google Sheets files. | |
Returns: | |
list: A list of Google Sheets files in the folder. | |
""" | |
resource = self._create_resource() | |
folder_contents = getfilelist.GetFileList(resource) | |
return self._filter_google_sheets_files(folder_contents) | |
def scan_drive(self): | |
"""Scan the Google Drive and get the Google Sheets files. | |
Returns: | |
list: A list of Google Sheets files in the Google Drive. | |
""" | |
return self._scan_folder() | |
@staticmethod | |
def _write_to_json_file(result, output_file): | |
"""Write the result to a JSON file. | |
Args: | |
result (list): The result to be written to the JSON file. | |
output_file (str): The path to the output JSON file. | |
""" | |
output_file = Path(output_file) | |
if not output_file.parent.exists(): | |
output_file.parent.mkdir(parents=True, exist_ok=True) | |
with open(output_file, 'w') as f: | |
json.dump(result, f, indent=4) | |
def scan_drive_to_json(self, output_file): | |
"""Scan the Google Drive and write the result to a JSON file. | |
Args: | |
output_file (str): The path to the output JSON file. | |
""" | |
result = self.scan_drive() | |
self._write_to_json_file(result, output_file) | |
@staticmethod | |
def create_dataframe(data, skip_empty_headers=True): | |
"""Create a pandas DataFrame from the data extracted from the Google Sheet. | |
Args: | |
data (list): The data from the Google Sheet. | |
skip_empty_headers (bool): Whether to skip columns with empty headers. | |
Returns: | |
pd.DataFrame: The resulting pandas DataFrame. | |
""" | |
if skip_empty_headers: | |
# Filter out columns with empty string headers | |
header_indices = [i for i, c in enumerate(data[0]) if c != ''] | |
else: | |
header_indices = range(len(data[0])) | |
# Extract data for the selected columns | |
new_data = [[row[i] for i in header_indices] for row in data[1:]] | |
# Convert to DataFrame | |
df = pd.DataFrame(new_data, columns=[data[0][i] for i in header_indices]) | |
return df | |
@staticmethod | |
def sanitize_filename(filename: str) -> str: | |
"""Sanitize the filename by removing invalid characters. | |
Args: | |
filename (str): The original filename. | |
Returns: | |
str: The sanitized filename. | |
""" | |
# Remove leading and trailing whitespaces from the filename | |
filename = filename.strip() | |
# Remove invalid characters (?, :, /, \, *, ", <, >, |, &, . and space) from the filename | |
return re.sub(r'[?/:\\*"<>&.\s]', '_', filename) | |
def download_sheet_data(self, file_info, output_dir, sheet_id=0): | |
"""Download the data from the Google Sheet to a CSV file. | |
Args: | |
file_info (dict): Information about the file to be downloaded. | |
output_dir (str): The directory where the downloaded data will be stored. | |
sheet_id (int): The index of the sheet to download from the Google Sheet file. | |
""" | |
# Use the credentials to authorize the Google Sheets API | |
gc = gspread.service_account(filename=self.credentials_file) | |
# Access the Google Sheet | |
sheet = gc.open_by_key(file_info['id']) | |
# Download data from the Google Sheet | |
sheet_data = sheet.get_worksheet(sheet_id).get_all_values() | |
# print(sheet_data[:5]) | |
# Sanitize the filename and create the output file | |
sanitized_filename = self.sanitize_filename(file_info['name']) | |
output_file = Path(output_dir) / (sanitized_filename + '_' + str(sheet_id) + '.csv') | |
output_file.parent.mkdir(parents=True, exist_ok=True) | |
# Use pandas to write the data to a CSV file | |
sheet_data_df = self.create_dataframe(sheet_data) | |
# Normalize the headers | |
headers_normalizer = HeadersNormalizer(sheet_data_df) | |
sheet_data_df.columns = headers_normalizer.get_normalized_headers() | |
# Write the data to a CSV file | |
sheet_data_df.to_csv(output_file, index=False) | |
def load_exclusion_patterns(exclusion_patterns_file): | |
"""Load the exclusion patterns from a JSON file. | |
Args: | |
exclusion_patterns_file (str): Path to the exclusion patterns JSON file. | |
Returns: | |
dict: The loaded exclusion patterns. | |
""" | |
with open(exclusion_patterns_file) as f: | |
exclusion_patterns = json.load(f) | |
return exclusion_patterns | |
def load_categories(categories_file): | |
"""Load the categories from a JSON file. | |
Args: | |
categories_file (str): Path to the categories JSON file. | |
Returns: | |
dict: The loaded categories. | |
""" | |
with open(categories_file) as f: | |
categories = json.load(f) | |
return categories | |
def main(): | |
"""Download data from Google Drive using the Google Drive API.""" | |
# Create an argument parser to get the credentials file and the top folder id | |
parser = argparse.ArgumentParser(description="Download data from Google Drive") | |
# Arguments for the parser | |
parser.add_argument('--credentials_file', type=str, default='secrets/credentials.json', | |
help='The credentials file for the Google Drive API') | |
parser.add_argument('--top_folder_id', type=str, default='1X8WNjmAzcPoe8xnCDXxrHmFoYWE1ToYz', | |
help='The id of the top folder in the Google Drive') | |
parser.add_argument('--output_dir', type=str, default='/tmp', | |
help='The directory where the downloaded data will be stored') | |
parser.add_argument('--sheet_index', type=int, default=1, | |
help='The index of the sheet to download from the Google Sheet file') | |
parser.add_argument('--organise_by_prefix', type=str2bool, default=True, | |
help='Organise the downloaded files by their prefix into folders') | |
parser.add_argument('--exclusion_patterns_file', type=str, default='data/conf/exclusion_patterns.json', | |
help='The file containing the exclusion patterns to ignore files using regex') | |
parser.add_argument('--categories_file', type=str, default='data/conf/categories.json', | |
help='The file containing the categories to organize the files') | |
# Parse the arguments from the command line | |
args = parser.parse_args() | |
# Load the exclusion patterns from the JSON file | |
exclusion_patterns = load_exclusion_patterns(args.exclusion_patterns_file) | |
# Create a DataDownloader object and scan the Google Drive to get the Google Sheets files and download the data | |
scanner = DataDownloader(args.credentials_file, args.top_folder_id) | |
scanner.scan_drive_to_json(args.output_dir + '/drive_contents.json') | |
# Load the categories from the JSON file | |
categories = load_categories(args.categories_file) | |
# Iterate over the files and download the data from the Google Sheets files | |
for file in tqdm(scanner.scan_drive()): | |
if any([pattern in file['name'] for pattern in exclusion_patterns]): | |
print( | |
f"Not downloading any data from {file['name']} based on the exclusion patterns in {args.exclusion_patterns_file}") | |
continue | |
try: | |
if args.organise_by_prefix and '_' in file['name'] and file[ | |
'mimeType'] == 'application/vnd.google-apps.spreadsheet': | |
# Find the prefix for the file based on the categories in the JSON file and create a directory for it | |
prefix = None | |
for c in categories: | |
if file['name'].lower().startswith(c.lower()): | |
prefix = c.lower() | |
# Exit the loop if a prefix is found for the file | |
break | |
if prefix is None: | |
print(f"Could not find a category for file {file['name']}") | |
raise ValueError(f"Could not find a category for file {file['name']}") | |
# Create a directory for the prefix if it does not exist | |
storage_dir = Path(args.output_dir) / prefix | |
if not storage_dir.exists(): | |
storage_dir.mkdir(parents=True, exist_ok=True) | |
scanner.download_sheet_data(file, storage_dir, args.sheet_index) | |
else: | |
scanner.download_sheet_data(file, args.output_dir, args.sheet_index) | |
except Exception as e: | |
print(f"Error downloading sheet {args.sheet_index} data from file {file['name']}: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment