Created
March 11, 2024 04:26
-
-
Save hlb/9018c268488ee97d7502d7c36245df7e to your computer and use it in GitHub Desktop.
process THSRC receipts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
import os | |
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/drive'] | |
def get_credentials(): | |
"""Gets valid user credentials from storage.""" | |
creds = None | |
if os.path.exists('token.json'): | |
creds = Credentials.from_authorized_user_file('token.json', SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) | |
creds = flow.run_local_server(port=0) | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
return creds |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import base64 | |
import logging | |
import os | |
import re | |
import configparser | |
import fitz # PyMuPDF | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaIoBaseUpload | |
from google_auth import get_credentials # External authentication module | |
# Read configuration file | |
config = configparser.ConfigParser() | |
config.read('config.ini') | |
# Constants | |
ORIGINAL_LABEL_NAME = config['Labels']['original_label_name'] | |
NEW_LABEL_NAME = config['Labels']['new_label_name'] | |
GDRIVE_FOLDER_ID = config['GoogleDrive']['gdrive_folder_id'] | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Gmail and Drive API scopes | |
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/drive'] | |
def find_label_id(service, label_name): | |
""" | |
Finds the ID of a Gmail label given its name. | |
Args: | |
service: The Gmail API service instance. | |
label_name: The name of the label to find. | |
Returns: | |
The ID of the label if found, else None. | |
""" | |
try: | |
response = service.users().labels().list(userId='me').execute() | |
labels = response.get('labels', []) | |
for label in labels: | |
if label['name'] == label_name: | |
return label['id'] | |
except Exception as error: | |
logging.error(f'Error finding label ID for {label_name}: {error}') | |
return None | |
def extract_start_end_locations(text, pattern): | |
""" | |
Extracts start and end locations from a text using a given pattern. | |
Args: | |
text: The text to be searched. | |
pattern: The regex pattern to find the start and end locations. | |
Returns: | |
A tuple containing the start and end locations. | |
""" | |
match = re.search(pattern, text) | |
if match: | |
start_location = ''.join(re.findall(r'[\u4e00-\u9fff]+', match.group(1))) | |
end_location = ''.join(re.findall(r'[\u4e00-\u9fff]+', match.group(3))) | |
return start_location, end_location | |
return "Start not found", "End not found" | |
def extract_ticket_details_from_pdf(file_data): | |
""" | |
Extracts ticket details from a PDF file, including date, itinerary, fare, and start/end locations. | |
Args: | |
file_data (bytes): The binary data of the PDF file. | |
Returns: | |
tuple: A tuple containing date, itinerary, fare, start location, and end location. | |
""" | |
# Create a PDF file in memory | |
pdf_stream = io.BytesIO(file_data) | |
# Open the PDF | |
doc = fitz.open(stream=pdf_stream, filetype="pdf") | |
# Extract text from the first page | |
page = doc.load_page(0) | |
text = page.get_text() | |
# Trimming and processing text | |
start_index = text.find("票款 Fare") | |
end_index = text.find("本單據係為旅客購買乘車票之證明") | |
relevant_text = text[start_index:end_index] | |
# Split the relevant text by line breaks | |
lines = relevant_text.split('\n') | |
# Initialize extracted details | |
date, itinerary, fare, start_location, end_location = ("Date not found", "Itinerary not found", | |
"Fare not found", "Start not found", | |
"End not found") | |
# Define patterns for extracting details | |
date_pattern = r"\d{4}-\d{2}-\d{2}" | |
itinerary_pattern = r"([\w\s]+?)(?=\s*\d{2}:\d{2}|\s*[-]{1,3}\s*|\s*$)\s*(\d{2}:\d{2})?\s*[-]{1,3}\s*([\w\s]+?)(?=\s*\d{2}:\d{2}|\s*$)\s*(\d{2}:\d{2})?" | |
fare_pattern = r"NT\$\s*\d+" | |
# Search for patterns in each line to extract information | |
for line in lines: | |
if re.search(date_pattern, line): | |
date = re.search(date_pattern, line).group(0) | |
if re.search(itinerary_pattern, line): | |
itinerary = ' '.join(re.search(itinerary_pattern, line).group(0).split()) | |
start_location, end_location = extract_start_end_locations(line, itinerary_pattern) | |
if re.search(fare_pattern, line): | |
fare = ''.join(re.search(fare_pattern, line).group(0).split()) | |
# Close the document | |
doc.close() | |
return date, itinerary, fare, start_location, end_location | |
def rename_pdf(file_data, date, start_location, end_location, fare): | |
""" | |
Renames a PDF file based on ticket details extracted from it. | |
Args: | |
file_data (bytes): The binary data of the PDF file. | |
date (str): The extracted date. | |
start_location (str): The extracted start location. | |
end_location (str): The extracted end location. | |
fare (str): The extracted fare. | |
Returns: | |
tuple: A tuple containing the new filename and a BytesIO stream of the PDF. | |
""" | |
# Format the date and fare for the filename | |
formatted_date = ''.join(date.split('-')) | |
numeric_fare = ''.join(filter(str.isdigit, fare)) | |
new_filename = f"{formatted_date}-高鐵票-{start_location}-{end_location}-{numeric_fare}.pdf" | |
# Create a new BytesIO stream with the original file data | |
new_pdf_stream = io.BytesIO() | |
new_pdf_stream.write(file_data) | |
new_pdf_stream.seek(0) | |
return new_filename, new_pdf_stream | |
def process_messages(service, drive_service, original_label_id, new_label_id): | |
""" | |
Processes all messages with a specific label from the Gmail account, extracting, renaming, and uploading PDFs. | |
Args: | |
service: The Gmail API service instance. | |
drive_service: The Google Drive API service instance. | |
original_label_id (str): The ID of the original Gmail label. | |
new_label_id (str): The ID of the new Gmail label to be applied after processing. | |
""" | |
results = service.users().messages().list(userId='me', labelIds=[original_label_id]).execute() | |
messages = results.get('messages', []) | |
logging.info(f'Number of messages with label "{ORIGINAL_LABEL_NAME}": {len(messages)}') | |
for message in messages: | |
process_single_message(service, drive_service, message, original_label_id, new_label_id) | |
def process_single_message(service, drive_service, message, original_label_id, new_label_id): | |
""" | |
Processes a single message: extracts the PDF attachment, renames it, uploads to Drive, and updates Gmail labels. | |
Args: | |
service: The Gmail API service instance. | |
drive_service: The Google Drive API service instance. | |
message (dict): The Gmail message object. | |
original_label_id (str): The ID of the original Gmail label. | |
new_label_id (str): The ID of the new Gmail label to be applied after processing. | |
""" | |
msg = service.users().messages().get(userId='me', id=message['id']).execute() | |
for part in msg.get('payload', {}).get('parts', []): | |
if part.get('filename', '').endswith('.pdf'): | |
attachment_id = part['body'].get('attachmentId') | |
if attachment_id: | |
file_data = get_attachment_data(service, message['id'], attachment_id) | |
new_filename, new_pdf_stream = process_pdf_file(file_data) | |
upload_file_to_drive(drive_service, new_filename, new_pdf_stream) | |
update_message_label(service, message['id'], original_label_id, new_label_id) | |
def get_attachment_data(service, message_id, attachment_id): | |
""" | |
Retrieves attachment data from a Gmail message. | |
Args: | |
service: The Gmail API service instance. | |
message_id (str): The ID of the message containing the attachment. | |
attachment_id (str): The ID of the attachment. | |
Returns: | |
bytes: The binary data of the attachment. | |
""" | |
attachment = service.users().messages().attachments().get(userId='me', messageId=message_id, id=attachment_id).execute() | |
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) | |
return file_data | |
def process_pdf_file(file_data): | |
""" | |
Processes a PDF file: extracts details, renames the file, and prepares it for upload. | |
Args: | |
file_data (bytes): The binary data of the PDF file. | |
Returns: | |
tuple: A tuple containing the new filename and a BytesIO stream of the renamed PDF. | |
""" | |
date, itinerary, fare, start_location, end_location = extract_ticket_details_from_pdf(file_data) | |
new_filename, new_pdf_stream = rename_pdf(file_data, date, start_location, end_location, fare) | |
return new_filename, new_pdf_stream | |
def upload_file_to_drive(drive_service, filename, file_stream): | |
""" | |
Uploads a file to Google Drive. | |
Args: | |
drive_service: The Google Drive API service instance. | |
filename (str): The name of the file to be uploaded. | |
file_stream (io.BytesIO): The file stream to be uploaded. | |
""" | |
file_metadata = {'name': filename, 'parents': [GDRIVE_FOLDER_ID]} | |
media = MediaIoBaseUpload(file_stream, mimetype='application/pdf') | |
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
logging.info(f'File: "{filename}". Uploaded with ID: {file.get("id")}') | |
def update_message_label(service, message_id, original_label_id, new_label_id): | |
""" | |
Updates the label of a Gmail message. | |
Args: | |
service: The Gmail API service instance. | |
message_id (str): The ID of the message to update. | |
original_label_id (str): The ID of the original label to be removed. | |
new_label_id (str): The ID of the new label to be applied. | |
""" | |
service.users().messages().modify(userId='me', id=message_id, body={'removeLabelIds': [original_label_id], 'addLabelIds': [new_label_id]}).execute() | |
logging.info(f'Message ID: "{message_id}". Label updated: {ORIGINAL_LABEL_NAME} -> {NEW_LABEL_NAME}') | |
service.users().messages().modify(userId='me', id=message_id, body={'removeLabelIds': ['INBOX']}).execute() | |
def main(): | |
""" | |
Main function to authenticate and process Gmail messages. | |
""" | |
try: | |
creds = get_credentials() | |
service = build('gmail', 'v1', credentials=creds) | |
drive_service = build('drive', 'v3', credentials=creds) | |
original_label_id = find_label_id(service, ORIGINAL_LABEL_NAME) | |
new_label_id = find_label_id(service, NEW_LABEL_NAME) | |
if original_label_id is None or new_label_id is None: | |
logging.error("One or both labels not found.") | |
return | |
process_messages(service, drive_service, original_label_id, new_label_id) | |
except Exception as e: | |
logging.error(f'An error occurred in the main process: {e}') | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
google-api-python-client | |
google-auth-httplib2 | |
google-auth-oauthlib | |
PyMuPDF |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment