Skip to content

Instantly share code, notes, and snippets.

@hlb
Created March 11, 2024 04:26
Show Gist options
  • Save hlb/9018c268488ee97d7502d7c36245df7e to your computer and use it in GitHub Desktop.
Save hlb/9018c268488ee97d7502d7c36245df7e to your computer and use it in GitHub Desktop.
process THSRC receipts
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import os
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/drive']
def get_credentials():
"""Gets valid user credentials from storage."""
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
import io
import base64
import logging
import os
import re
import configparser
import fitz # PyMuPDF
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
from google_auth import get_credentials # External authentication module
# Read configuration file
config = configparser.ConfigParser()
config.read('config.ini')
# Constants
ORIGINAL_LABEL_NAME = config['Labels']['original_label_name']
NEW_LABEL_NAME = config['Labels']['new_label_name']
GDRIVE_FOLDER_ID = config['GoogleDrive']['gdrive_folder_id']
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Gmail and Drive API scopes
SCOPES = ['https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/drive']
def find_label_id(service, label_name):
"""
Finds the ID of a Gmail label given its name.
Args:
service: The Gmail API service instance.
label_name: The name of the label to find.
Returns:
The ID of the label if found, else None.
"""
try:
response = service.users().labels().list(userId='me').execute()
labels = response.get('labels', [])
for label in labels:
if label['name'] == label_name:
return label['id']
except Exception as error:
logging.error(f'Error finding label ID for {label_name}: {error}')
return None
def extract_start_end_locations(text, pattern):
"""
Extracts start and end locations from a text using a given pattern.
Args:
text: The text to be searched.
pattern: The regex pattern to find the start and end locations.
Returns:
A tuple containing the start and end locations.
"""
match = re.search(pattern, text)
if match:
start_location = ''.join(re.findall(r'[\u4e00-\u9fff]+', match.group(1)))
end_location = ''.join(re.findall(r'[\u4e00-\u9fff]+', match.group(3)))
return start_location, end_location
return "Start not found", "End not found"
def extract_ticket_details_from_pdf(file_data):
"""
Extracts ticket details from a PDF file, including date, itinerary, fare, and start/end locations.
Args:
file_data (bytes): The binary data of the PDF file.
Returns:
tuple: A tuple containing date, itinerary, fare, start location, and end location.
"""
# Create a PDF file in memory
pdf_stream = io.BytesIO(file_data)
# Open the PDF
doc = fitz.open(stream=pdf_stream, filetype="pdf")
# Extract text from the first page
page = doc.load_page(0)
text = page.get_text()
# Trimming and processing text
start_index = text.find("票款 Fare")
end_index = text.find("本單據係為旅客購買乘車票之證明")
relevant_text = text[start_index:end_index]
# Split the relevant text by line breaks
lines = relevant_text.split('\n')
# Initialize extracted details
date, itinerary, fare, start_location, end_location = ("Date not found", "Itinerary not found",
"Fare not found", "Start not found",
"End not found")
# Define patterns for extracting details
date_pattern = r"\d{4}-\d{2}-\d{2}"
itinerary_pattern = r"([\w\s]+?)(?=\s*\d{2}:\d{2}|\s*[-]{1,3}\s*|\s*$)\s*(\d{2}:\d{2})?\s*[-]{1,3}\s*([\w\s]+?)(?=\s*\d{2}:\d{2}|\s*$)\s*(\d{2}:\d{2})?"
fare_pattern = r"NT\$\s*\d+"
# Search for patterns in each line to extract information
for line in lines:
if re.search(date_pattern, line):
date = re.search(date_pattern, line).group(0)
if re.search(itinerary_pattern, line):
itinerary = ' '.join(re.search(itinerary_pattern, line).group(0).split())
start_location, end_location = extract_start_end_locations(line, itinerary_pattern)
if re.search(fare_pattern, line):
fare = ''.join(re.search(fare_pattern, line).group(0).split())
# Close the document
doc.close()
return date, itinerary, fare, start_location, end_location
def rename_pdf(file_data, date, start_location, end_location, fare):
"""
Renames a PDF file based on ticket details extracted from it.
Args:
file_data (bytes): The binary data of the PDF file.
date (str): The extracted date.
start_location (str): The extracted start location.
end_location (str): The extracted end location.
fare (str): The extracted fare.
Returns:
tuple: A tuple containing the new filename and a BytesIO stream of the PDF.
"""
# Format the date and fare for the filename
formatted_date = ''.join(date.split('-'))
numeric_fare = ''.join(filter(str.isdigit, fare))
new_filename = f"{formatted_date}-高鐵票-{start_location}-{end_location}-{numeric_fare}.pdf"
# Create a new BytesIO stream with the original file data
new_pdf_stream = io.BytesIO()
new_pdf_stream.write(file_data)
new_pdf_stream.seek(0)
return new_filename, new_pdf_stream
def process_messages(service, drive_service, original_label_id, new_label_id):
"""
Processes all messages with a specific label from the Gmail account, extracting, renaming, and uploading PDFs.
Args:
service: The Gmail API service instance.
drive_service: The Google Drive API service instance.
original_label_id (str): The ID of the original Gmail label.
new_label_id (str): The ID of the new Gmail label to be applied after processing.
"""
results = service.users().messages().list(userId='me', labelIds=[original_label_id]).execute()
messages = results.get('messages', [])
logging.info(f'Number of messages with label "{ORIGINAL_LABEL_NAME}": {len(messages)}')
for message in messages:
process_single_message(service, drive_service, message, original_label_id, new_label_id)
def process_single_message(service, drive_service, message, original_label_id, new_label_id):
"""
Processes a single message: extracts the PDF attachment, renames it, uploads to Drive, and updates Gmail labels.
Args:
service: The Gmail API service instance.
drive_service: The Google Drive API service instance.
message (dict): The Gmail message object.
original_label_id (str): The ID of the original Gmail label.
new_label_id (str): The ID of the new Gmail label to be applied after processing.
"""
msg = service.users().messages().get(userId='me', id=message['id']).execute()
for part in msg.get('payload', {}).get('parts', []):
if part.get('filename', '').endswith('.pdf'):
attachment_id = part['body'].get('attachmentId')
if attachment_id:
file_data = get_attachment_data(service, message['id'], attachment_id)
new_filename, new_pdf_stream = process_pdf_file(file_data)
upload_file_to_drive(drive_service, new_filename, new_pdf_stream)
update_message_label(service, message['id'], original_label_id, new_label_id)
def get_attachment_data(service, message_id, attachment_id):
"""
Retrieves attachment data from a Gmail message.
Args:
service: The Gmail API service instance.
message_id (str): The ID of the message containing the attachment.
attachment_id (str): The ID of the attachment.
Returns:
bytes: The binary data of the attachment.
"""
attachment = service.users().messages().attachments().get(userId='me', messageId=message_id, id=attachment_id).execute()
file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))
return file_data
def process_pdf_file(file_data):
"""
Processes a PDF file: extracts details, renames the file, and prepares it for upload.
Args:
file_data (bytes): The binary data of the PDF file.
Returns:
tuple: A tuple containing the new filename and a BytesIO stream of the renamed PDF.
"""
date, itinerary, fare, start_location, end_location = extract_ticket_details_from_pdf(file_data)
new_filename, new_pdf_stream = rename_pdf(file_data, date, start_location, end_location, fare)
return new_filename, new_pdf_stream
def upload_file_to_drive(drive_service, filename, file_stream):
"""
Uploads a file to Google Drive.
Args:
drive_service: The Google Drive API service instance.
filename (str): The name of the file to be uploaded.
file_stream (io.BytesIO): The file stream to be uploaded.
"""
file_metadata = {'name': filename, 'parents': [GDRIVE_FOLDER_ID]}
media = MediaIoBaseUpload(file_stream, mimetype='application/pdf')
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute()
logging.info(f'File: "{filename}". Uploaded with ID: {file.get("id")}')
def update_message_label(service, message_id, original_label_id, new_label_id):
"""
Updates the label of a Gmail message.
Args:
service: The Gmail API service instance.
message_id (str): The ID of the message to update.
original_label_id (str): The ID of the original label to be removed.
new_label_id (str): The ID of the new label to be applied.
"""
service.users().messages().modify(userId='me', id=message_id, body={'removeLabelIds': [original_label_id], 'addLabelIds': [new_label_id]}).execute()
logging.info(f'Message ID: "{message_id}". Label updated: {ORIGINAL_LABEL_NAME} -> {NEW_LABEL_NAME}')
service.users().messages().modify(userId='me', id=message_id, body={'removeLabelIds': ['INBOX']}).execute()
def main():
"""
Main function to authenticate and process Gmail messages.
"""
try:
creds = get_credentials()
service = build('gmail', 'v1', credentials=creds)
drive_service = build('drive', 'v3', credentials=creds)
original_label_id = find_label_id(service, ORIGINAL_LABEL_NAME)
new_label_id = find_label_id(service, NEW_LABEL_NAME)
if original_label_id is None or new_label_id is None:
logging.error("One or both labels not found.")
return
process_messages(service, drive_service, original_label_id, new_label_id)
except Exception as e:
logging.error(f'An error occurred in the main process: {e}')
if __name__ == '__main__':
main()
google-api-python-client
google-auth-httplib2
google-auth-oauthlib
PyMuPDF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment