Last active
June 29, 2023 08:30
-
-
Save kogens/4eb08e9f8d6442a0ef612beb4b06e218 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Extract images from PDF files and decode any QR codes found. | |
Requirements: | |
> sudo apt install zbar-tools | |
> pip install pandas[xlsxwriter] opencv-python pymupdf qrdet qreader tqdm | |
See also the docs for the QR part | |
https://github.com/Eric-Canas/qrdet | |
https://github.com/Eric-Canas/qreader | |
""" | |
import argparse | |
import os | |
import pathlib | |
import warnings | |
from typing import Generator, Iterable | |
import cv2 # opencv-python | |
import fitz # pymupdf | |
import numpy as np | |
import pandas as pd | |
from qrdet import QRDetector | |
from qreader import QReader | |
from tqdm import tqdm | |
# Ignore warning about torch.meshgrid in QReader module | |
warnings.filterwarnings('ignore', message='torch.meshgrid', category=UserWarning) | |
def extract_images_from_pdf(path, save_images=False): | |
""" Extracts all images in a pdf. If save_images is True, images are saved in the current working directory """ | |
# https://stackoverflow.com/questions/2693820/extract-images-from-pdf-without-resampling-in-python | |
path = pathlib.Path(path) | |
# Load document and loop through pages | |
doc = fitz.Document(path) | |
for page_number in range(len(doc)): | |
# Extract all images in the page | |
for img in doc.get_page_images(page_number): | |
xref = img[0] | |
pix = fitz.Pixmap(doc, xref) | |
if save_images: | |
pix.save(f'{path.stem}_p{page_number}-x{xref}.png') | |
# Convert to numpy.ndarray and yield each image | |
# https://github.com/pymupdf/PyMuPDF/discussions/1208 | |
yield np.frombuffer(buffer=pix.samples, dtype=np.uint8).reshape((pix.height, pix.width, -1)) | |
def qr_codes_from_image(image: np.ndarray, qrdetector: QRDetector, qreader: QReader): | |
""" Detects locations of QR codes in an image and attempts to decode """ | |
# Ensure image is uint8 numpy array and locate QR codes | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
detections = qrdetector.detect(image=image) | |
# Decode each QR code found | |
for detection in detections: | |
bbox = detection[0] | |
decoded_text = qreader.decode(image, bbox=bbox) | |
yield bbox, decoded_text | |
def path_to_winpath(path: pathlib.Path | str) -> str: | |
""" Convert a path mounted in linux to a windows like path """ | |
path = pathlib.Path(path) | |
if path.drive == '': | |
# Assuming path is mounted as e.g. /media/o or media/q | |
mountpoint = path.parts[2].upper() | |
return f'{mountpoint}:\\' + '\\'.join(path.parts[3:]) | |
else: | |
return str(path) | |
def decode_pdf(path: os.PathLike | str, qrdetector: QRDetector, qreader: QReader) -> pd.DataFrame: | |
""" Detect QR codes in a given PDF and return as Dataframe """ | |
path = pathlib.Path(path) | |
results = pd.DataFrame(columns=['Type', 'Bounding box', 'Decoded text', 'Part number', | |
'Serial number', 'Filename', 'Date modified', 'Date created', | |
'Notes', 'Full path']) | |
try: | |
# Extract images and decode QR codes | |
images = extract_images_from_pdf(path) | |
for image in images: | |
qr_codes = pd.DataFrame(qr_codes_from_image(image, qrdetector, qreader), | |
columns=['Bounding box', 'Decoded text']) | |
# If QR codes are found, add them to result | |
if not qr_codes.empty: | |
results = pd.concat([results, qr_codes]) | |
results['Type'] = 'QR' | |
results[['Part number', 'Serial number']] = results['Decoded text'].str.extract( | |
r'^(\d+-?\d+) (\d{6})$') | |
note = 'QR found but unable to decode' | |
results['Notes'] = results['Decoded text'].apply(lambda x: note if pd.isna(x) else None) | |
except (fitz.fitz.FileDataError, fitz.fitz.EmptyFileError): | |
# If PDF is broken, make a note | |
results['Notes'] = ['Invalid PDF file'] | |
# Add metadata about the pdf file | |
filename = path.name | |
date_modified = pd.to_datetime(path.stat().st_mtime, unit='s') | |
date_created = pd.to_datetime(path.stat().st_ctime, unit='s') | |
if results.empty: | |
metadata = pd.DataFrame( | |
[{'Filename': filename, | |
'Full path': path, | |
'Date modified': date_modified, | |
'Date created': date_created, | |
'Notes': 'No codes found in file'}]) | |
results = pd.concat([results, metadata]) | |
else: | |
results[['Filename', 'Full path', 'Date modified', 'Date created']] = [filename, path, | |
date_modified, | |
date_created] | |
return results | |
def scan_pdfs_in_directory(path: os.PathLike | str, | |
datafile: os.PathLike | str, | |
n_newest: int = None) -> Generator[pd.DataFrame, None, None]: | |
""" Load all PDF files in a given directory and decode any QR codes found in the documents """ | |
path, datafile = pathlib.Path(path), pathlib.Path(datafile) | |
pdf_files = pd.DataFrame() | |
# Load saved data and filter away any files present in the csv | |
pdf_files['Full path'] = list(path.glob('*.pdf')) | |
pdf_files['Filename'] = pdf_files['Full path'].apply(lambda x: x.name) | |
existing_files = [] | |
if datafile.is_file(): | |
print(f'Loading from existing data file:\n "{datafile}"') | |
existing_files = pd.read_csv(datafile, sep=';')['Filename'].unique().tolist() | |
pdf_files = pdf_files[~pdf_files['Filename'].isin(existing_files)] | |
# Get modification and creation date | |
pdf_files['Date modified'] = pdf_files['Full path'].apply(lambda x: pd.to_datetime(x.stat().st_mtime, unit='s')) | |
# Sort newest files first, optional filter on n newest files | |
pdf_files = pdf_files.sort_values(by='Date modified', ascending=False) | |
if n_newest: | |
pdf_files = pdf_files.loc[0:n_newest] | |
if pdf_files.empty: | |
print(f'No new PDF files found ({len(existing_files)} already decoded).') | |
else: | |
print(f'Decoding QR codes in {len(pdf_files["Full path"])} PDF files ({len(existing_files)} already decoded)') | |
# Define the QRDetector and QReader needed for processing and loop through all images. | |
qreader = QReader() | |
qdetector = QRDetector() | |
# Extract images from each PDF file in list and decode any QR codes | |
pbar = tqdm(pdf_files['Full path']) | |
for pdf_path in pbar: | |
pdf_date = pd.to_datetime(pdf_path.stat().st_mtime, unit='s') | |
pbar.set_description(f'Decoding {pdf_path.name} [{pdf_date:%Y-%m-%d %H:%M:%S}]') | |
decoded_pdf = decode_pdf(pdf_path, qdetector, qreader) | |
yield decoded_pdf | |
def append_to_csv(path: os.PathLike | str, decoded_results: Iterable[pd.DataFrame]): | |
""" Append results from each file to a CSV file, only write header if file does not already exist """ | |
path = pathlib.Path(path) | |
print(f'Saving results to:\n "{path}"') | |
cols = ['Type', 'Bounding box', 'Decoded text', 'Part number', | |
'Serial number', 'Filename', 'Date modified', 'Date created', 'Notes', 'Full path'] | |
for result in decoded_results: | |
result = result[cols] | |
write_header = bool(not path.is_file()) | |
result.to_csv(path, | |
mode='a', | |
index=False, | |
sep=';', | |
# date_format='%Y-%m-%d %H:%M:%S', | |
header=write_header) | |
def write_to_excel(path: os.PathLike | str, dataframe: pd.DataFrame, make_links=True, sheet_name='Decoded PDFs', | |
**kwargs): | |
""" Write dataframe to Excel file as a formatted table with autofilters """ | |
path = pathlib.Path(path) | |
dataframe['Full path'] = dataframe['Full path'].apply(path_to_winpath) | |
if make_links: | |
# Enable Excel to understand paths as links. Significantly increases loading time when opening the file. | |
dataframe['Full path'] = 'external:' + dataframe['Full path'] | |
# https://xlsxwriter.readthedocs.io/example_pandas_table.html | |
with pd.ExcelWriter(path, engine='xlsxwriter') as writer: | |
dataframe.to_excel(writer, index=False, sheet_name=sheet_name, startrow=1, header=False, **kwargs) | |
worksheet = writer.sheets[sheet_name] | |
# Add the Excel table structure. Pandas will add the data. | |
column_settings = [{'header': column} for column in dataframe.columns] | |
(max_row, max_col) = dataframe.shape | |
worksheet.add_table(0, 0, max_row, max_col - 1, | |
{'columns': column_settings, | |
'banded_rows': True, | |
'style': 'Table Style Medium 2'}) | |
# Formatting | |
worksheet.freeze_panes(1, 0) | |
worksheet.autofit() | |
worksheet.set_column('G:H', 20) # Extra width for date columns | |
worksheet.set_column('B:B', None, None, {'hidden': 1}) # Hide bbox column | |
# Get statistics for separate datasheet | |
stats_sheet = 'Stats' | |
stats = get_stats(dataframe) | |
stats.to_excel(writer, index=True, sheet_name=stats_sheet, startrow=1, header=False, **kwargs) | |
workbook = writer._book | |
worksheet = writer.sheets['Stats'] | |
table_cols = ['Month'] + stats.columns.to_list() | |
column_settings = [{'header': column} for column in table_cols] | |
(max_row, max_col) = stats.shape | |
worksheet.add_table(0, 0, max_row, max_col, | |
{'columns': column_settings, | |
'banded_rows': True, | |
'style': 'Table Style Medium 2'}) | |
# Formatting | |
worksheet.freeze_panes(1, 0) | |
worksheet.autofit() | |
worksheet.set_column('A:A', 20) # Extra width for date columns | |
# Create a new chart object. In this case an embedded chart. | |
n = stats.shape[0] | |
values = f'={stats_sheet}!${{col}}$2:${{col}}${n + 1}' | |
dates = values.format(col='A') | |
scale_x, scale_y = 1.5, 1.5 | |
# Plot number of files and QR codes found | |
chart1 = workbook.add_chart({'type': 'line'}) | |
chart1.set_x_axis({'num_format': 'yyyy-mm', 'num_font': {'rotation': 45}}) | |
chart1.set_y_axis({'name': 'Number'}) | |
chart1.set_legend({'position': 'top'}) | |
chart1.add_series({'name': f'={stats_sheet}!$B1', | |
'categories': dates, | |
'values': values.format(col='B')}) | |
chart1.add_series({'name': f'={stats_sheet}!$C1', | |
'categories': dates, | |
'values': values.format(col='C')}) | |
worksheet.insert_chart('I2', chart1, {'x_scale': scale_x, 'y_scale': scale_y}) | |
# Plot some ratios of stats | |
chart2 = workbook.add_chart({'type': 'line'}) | |
chart2.set_x_axis({'num_format': 'yyyy-mm', 'num_font': {'rotation': 45}}) | |
chart2.set_y_axis({'name': 'Ratio'}) | |
chart2.set_legend({'position': 'top'}) | |
chart2.add_series({'name': f'={stats_sheet}!$D1', | |
'categories': dates, | |
'values': values.format(col='D')}) | |
chart2.add_series({'name': f'={stats_sheet}!$G1', | |
'categories': dates, | |
'values': values.format(col='G')}) | |
worksheet.insert_chart('I25', chart2, {'x_scale': scale_x, 'y_scale': scale_y}) | |
def get_stats(dataframe: pd.DataFrame) -> pd.DataFrame: | |
""" Aggregate some key numbers on number of files and QR codes """ | |
grouping = [dataframe['Date modified'].dt.year, dataframe['Date modified'].dt.month] | |
n_files = dataframe['Filename'].groupby(grouping).nunique().rename('Files') | |
n_empty_files = (dataframe['Notes'] == 'No codes found in file').groupby(grouping).sum().rename('Empty files') | |
n_empty_files_ratio = (n_empty_files / n_files).rename('Empty files/Files') | |
n_codes = (dataframe['Type'] == 'QR').groupby(grouping).sum().rename('QR codes') | |
n_undecoded_codes = ((dataframe['Type'] == 'QR') & pd.isna(dataframe['Decoded text'])).groupby( | |
grouping).sum().rename('Undecoded QR codes') | |
failed_decoding_ratio = (n_undecoded_codes / n_codes).rename('Undecoded QR codes/QR codes') | |
stats = pd.concat([n_files, n_empty_files, n_empty_files_ratio, n_codes, n_undecoded_codes, failed_decoding_ratio], | |
axis=1) | |
date_index = pd.to_datetime(stats.index.to_flat_index().map(lambda x: f'{x[0]}-{x[1]}')) | |
stats.index = date_index | |
return stats | |
if __name__ == '__main__': | |
argparse_description = 'Decode QR codes from images in scanned PDF files.' | |
parser = argparse.ArgumentParser(description=argparse_description) | |
parser.add_argument('-s', '--scandir', metavar='path_to_pdfs', required=False, | |
help='Path to directory containing PDFs to be scanned') | |
parser.add_argument('-c', '--csvfile', metavar='path_to_csv', default='decoded_pdfs.csv', required=False, | |
help='Path to CSV file for reading and storing decoded data from the PDFs (default: "decoded_pdfs.csv")') | |
parser.add_argument('-x', '--excelfile', metavar='path_to_xlsx', required=False, | |
help='Export results as a Excel file') | |
parser.add_argument('-l', '--makelinks', action='store_true', required=False, | |
help='Save paths to PDF files as clickable links in Excel. ' | |
'Useful but significantly increases loading time when opening the file.') | |
args = parser.parse_args() | |
datafile = pathlib.Path(args.csvfile) | |
if args.scandir: | |
# Extract images from all PDFs in given directory and save to csv | |
pdf_dir = pathlib.Path(args.scandir) | |
print(f'Scanning for pdfs in:\n "{pdf_dir}"\n' | |
f'Saving data to:\n "{datafile}"') | |
decoded_results = scan_pdfs_in_directory(path=pdf_dir, datafile=datafile) | |
append_to_csv(datafile, decoded_results) | |
if args.excelfile: | |
# Load existing data in dataframe and save as formatted Excel file | |
excelfile = pathlib.Path(args.excelfile) | |
print(f'Saving to Excel file:\n "{excelfile}"\n' | |
f'Using existing data:\n "{datafile}"') | |
existing_data = pd.read_csv(datafile, | |
sep=';', | |
parse_dates=['Date modified', 'Date created'], | |
dtype={'Serial number': 'Int64'}) | |
existing_data = existing_data.sort_values(by='Date modified', ascending=False) | |
# Save to a temporary file in case we cannot write to the Excel file | |
tempfile = excelfile.with_suffix(excelfile.suffix + '.bak') | |
try: | |
# Write to temporary Excel file, remove any existing excel file, then rename temp file | |
write_to_excel(tempfile, existing_data, make_links=args.makelinks) | |
excelfile.unlink(missing_ok=True) | |
tempfile.rename(excelfile) | |
except (PermissionError, OSError) as e: | |
warnings.warn(f'Could not write to "{excelfile.name}". Is the file open in Excel? ' | |
f'Saved to temporary file: {tempfile.name}\n' | |
f'{e}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment