Last active
October 26, 2024 19:26
-
-
Save hansvdam/3715df914ece19ad92598573aed3b76b to your computer and use it in GitHub Desktop.
fotos van bonnentjes hernoemen met AI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import os | |
import openpyxl | |
from langchain.chains.transform import TransformChain | |
from langchain_community.chat_models import ChatOpenAI | |
from langchain_core.messages import HumanMessage | |
from openai.types import image_model | |
from pathlib import Path | |
from openpyxl.styles import PatternFill | |
from openpyxl.utils import get_column_letter | |
from openpyxl.worksheet.hyperlink import Hyperlink | |
from pydantic import BaseModel, Field | |
from langchain_core.output_parsers import PydanticOutputParser | |
import tkinter as tk | |
# Set your OpenAI API key (see https://platform.openai.com/api-keys) | |
os.environ["OPENAI_API_KEY"] = "<your openai key>" | |
def load_image(inputs: dict) -> dict: | |
"""Load image from file and encode it as base64.""" | |
image_path = inputs["image_path"] | |
def encode_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
image_base64 = encode_image(image_path) | |
return {"image": image_base64} | |
from langchain_community.document_loaders import PyPDFLoader | |
def load_pdf(inputs: dict) -> dict: | |
"""Load PDF from file and extract text.""" | |
pdf = inputs["image_path"] | |
loader = PyPDFLoader(pdf) | |
documents = loader.load()[0] | |
return {"text": documents.page_content} | |
load_image_chain = TransformChain( | |
input_variables=["image_path"], | |
output_variables=["image"], | |
transform=load_image | |
) | |
load_pdf_chain = TransformChain( | |
input_variables=["image_path"], | |
output_variables=["text"], | |
transform=load_pdf | |
) | |
class ImageInformation(BaseModel): | |
"""Information about an image in Dutch.""" | |
date: str = Field(description="the date in the image, in the format yyyy-mm-dd") | |
invoice_company: str = Field(description="Company that is the beneficiary of the invoice") | |
invoice_amount: float = Field(description="Amount of the invoice") | |
invoice_currency_symbol: str = Field(description="Currency symbol of the invoice, e.g. € or $") | |
invoice_product_kind: str = Field(description="Kind of product bought as a concise high-end (tax-)category") | |
class PDFInformation(ImageInformation): | |
"""Information about a PDF in Dutch.""" | |
date: str = Field(description="the date in the pdf, in the format yyyy-mm-dd") | |
parserIMG = PydanticOutputParser(pydantic_object=ImageInformation) | |
parserPDF = PydanticOutputParser(pydantic_object=PDFInformation) | |
def get_file_information(file_path: str, content_type: str = "image") -> dict: | |
vision_prompt = f"Given the {content_type}, provide information as requested in Dutch:" | |
if content_type == "image": | |
vision_chain = load_image_chain | image_model | parserIMG | |
elif content_type == "pdf": | |
vision_chain = load_pdf_chain | image_model | parserPDF | |
return vision_chain.invoke({ | |
'image_path': f'{file_path}', | |
'prompt': vision_prompt, | |
'content_type': content_type | |
}) | |
def image_model(inputs: dict) -> str | list[str] | dict: | |
"""Invoke model with image and prompt.""" | |
model = ChatOpenAI(temperature=0.0, model="gpt-4o-mini", max_tokens=1024) | |
content_type = inputs.get("content_type", "image") | |
content = [ | |
{"type": "text", "text": inputs["prompt"]}, | |
{"type": "text", | |
"text": parserIMG.get_format_instructions() if content_type == "image" else parserPDF.get_format_instructions()}, | |
] | |
if content_type == "image": | |
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{inputs['image']}"}}) | |
elif content_type == "pdf": | |
content.append({"type": "text", "text": inputs['text']}) | |
msg = model.invoke([HumanMessage(content=content)]) | |
return msg.content | |
def rename_images_in_directory(directory_path: str, progress_callback=None, cancel_check=None): | |
"""Rename all images and PDFs in the given directory and its subdirectories according to {date}-{purpose}.extension and create an Excel report.""" | |
# Create Excel workbook and sheet | |
workbook = openpyxl.Workbook() | |
sheet = workbook.active | |
sheet.title = "Invoices" | |
# Define headers | |
headers = ["Date", "Company", "Amount", "Currency", "Product Kind", "New Filename", "Old Filename"] | |
for col, header in enumerate(headers, start=1): | |
cell = sheet.cell(row=1, column=col, value=header) | |
cell.fill = PatternFill(start_color="00FF00", end_color="00FF00", fill_type="solid") | |
row = 2 | |
if not os.path.exists(directory_path): | |
raise FileNotFoundError(f"Directory {directory_path} does not exist.") | |
files = [] | |
for root, _, filenames in os.walk(directory_path): | |
for filename in filenames: | |
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.pdf')): | |
files.append(os.path.join(root, filename)) | |
total_files = len(files) | |
for index, file_path in enumerate(files): | |
if cancel_check and cancel_check(): | |
print("Operation cancelled.") | |
workbook.close() | |
return "cancelled" # Return "cancelled" status | |
filename = os.path.basename(file_path) | |
content_type = "pdf" if filename.lower().endswith('.pdf') else "image" | |
# Check for cancellation before processing each file | |
if cancel_check and cancel_check(): | |
print("Operation cancelled.") | |
workbook.close() | |
return "cancelled" # Return "cancelled" status | |
info = get_file_information(file_path, content_type) | |
# Check for cancellation after getting file information | |
if cancel_check and cancel_check(): | |
print("Operation cancelled.") | |
workbook.close() | |
return "cancelled" # Return "cancelled" status | |
# Extract date and purpose | |
date = info.date.replace('/', '-').replace('.', '') # Access as attribute | |
product_kind = info.invoice_product_kind.replace(' ', '_').replace('.', '') | |
company = info.invoice_company.replace(' ', '_').replace('.', '') | |
amount = str(info.invoice_amount) | |
currency =info.invoice_currency_symbol | |
# Get file extension | |
file_name, extension = os.path.splitext(filename) | |
# Create new filename | |
if content_type == "pdf": | |
partial_new_filename = f"{date}-{currency}{amount}-{product_kind}-{company}" | |
if file_name.startswith(partial_new_filename): | |
new_filename = f"{file_name}{extension}" | |
else: | |
new_filename = f"{partial_new_filename}-{file_name}{extension}" | |
else: | |
new_filename = f"{date}-{currency}{amount}-{product_kind}-{company}{extension}" | |
# Rename the file | |
new_file_path = os.path.join(os.path.dirname(file_path), new_filename) | |
os.rename(file_path, new_file_path) | |
print(f"Renamed: {filename} -> {new_filename}") | |
# Write data to Excel | |
sheet.cell(row=row, column=1, value=info.date) | |
sheet.cell(row=row, column=2, value=info.invoice_company) | |
sheet.cell(row=row, column=3, value=info.invoice_amount) | |
sheet.cell(row=row, column=4, value=info.invoice_currency_symbol) | |
sheet.cell(row=row, column=5, value=info.invoice_product_kind) | |
sheet.cell(row=row, column=6, value=new_filename) | |
sheet.cell(row=row, column=7, value=filename) | |
# Add hyperlink to the new filename | |
full_file_path = os.path.abspath(new_file_path) | |
cell = sheet.cell(row=row, column=6) # Update column number | |
cell.hyperlink = Hyperlink(ref=cell.coordinate, target=full_file_path, tooltip="Open file") | |
row += 1 | |
if progress_callback: | |
progress = (index + 1) / total_files * 100 | |
progress_callback(progress) | |
# Check for cancellation after updating progress | |
if cancel_check and cancel_check(): | |
print("Operation cancelled.") | |
workbook.close() | |
return "cancelled" # Return "cancelled" status | |
# Adjust column widths | |
for col in range(1, len(headers) + 1): | |
sheet.column_dimensions[get_column_letter(col)].width = 20 | |
# Save the workbook | |
excel_path = os.path.join(directory_path, "invoices.xlsx") | |
workbook.save(excel_path) | |
print(f"Excel report created: {excel_path}") | |
return "completed" # Return "completed" status if finished successfully | |
# Call the function with your directory path | |
rename_images_in_directory("/Users/hansvandam/temp/invoices2") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My updated version: import base64
import os
import openpyxl
from langchain.chains.transform import TransformChain
from langchain_core.messages import HumanMessage
from langchain_community.chat_models.openai import ChatOpenAI # Correct import for ChatOpenAI
from openai.types import image_model
from pathlib import Path
from openpyxl.styles import PatternFill
from openpyxl.utils import get_column_letter
from openpyxl.worksheet.hyperlink import Hyperlink
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser
Set your OpenAI API key
os.environ["OPENAI_API_KEY"] = "myopenAIkey"
Load Image Function
def load_image(inputs: dict) -> dict:
"""Load image from file and encode it as base64."""
image_path = inputs["image_path"]
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
image_base64 = encode_image(image_path)
return {"image": image_base64}
from langchain_community.document_loaders import PyPDFLoader
def load_pdf(inputs: dict) -> dict:
"""Load PDF from file and extract text."""
pdf = inputs["image_path"]
loader = PyPDFLoader(pdf)
documents = loader.load()[0]
return {"text": documents.page_content}
load_image_chain = TransformChain(
input_variables=["image_path"],
output_variables=["image"],
transform=load_image
)
load_pdf_chain = TransformChain(
input_variables=["image_path"],
output_variables=["text"],
transform=load_pdf
)
class ImageInformation(BaseModel):
"""Information about an image in Dutch."""
date: str = Field(description="the date in the image, in the format yyyy-mm-dd")
invoice_company: str = Field(description="Company that is the beneficiary of the invoice")
invoice_amount: float = Field(description="Amount of the invoice")
invoice_product_kind: str = Field(description="Kind of product bought as a high-end (tax-)category.")
class PDFInformation(ImageInformation):
"""Information about a PDF in Dutch."""
date: str = Field(description="the date in the pdf, in the format yyyy-mm-dd")
parserIMG = PydanticOutputParser(pydantic_object=ImageInformation)
parserPDF = PydanticOutputParser(pydantic_object=PDFInformation)
def get_file_information(file_path: str, content_type: str = "image") -> dict:
vision_prompt = f"Given the {content_type}, provide information as requested in Dutch:"
if content_type == "image":
vision_chain = load_image_chain | image_model | parserIMG
elif content_type == "pdf":
vision_chain = load_pdf_chain | image_model | parserPDF
return vision_chain.invoke({
'image_path': f'{file_path}',
'prompt': vision_prompt,
'content_type': content_type
})
def image_model(inputs: dict) -> str | list[str] | dict:
"""Invoke model with image and prompt."""
model = ChatOpenAI(temperature=0.0, model="gpt-4o", max_tokens=1024)
content_type = inputs.get("content_type", "image")
def rename_images_in_directory(directory_path: str):
"""Rename all images and PDFs in the given directory according to {date}-{purpose}.extension and create an Excel report."""
# Create Excel workbook and sheet
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = "Invoices"
Ensure the path is correct
rename_images_in_directory("C:/Users/Gebruiker/downloads/bonnetje")