Created
September 25, 2023 19:38
-
-
Save vdparikh/c8cc4a4c94e2620c14fbd930154ffbbf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import fnmatch | |
import pandas as pd | |
from presidio_analyzer import AnalyzerEngine, RecognizerResult, PatternRecognizer, Pattern | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import EngineResult | |
from presidio_image_redactor import ImageRedactorEngine, ImageAnalyzerEngine | |
from PIL import Image | |
import docx2txt | |
from pdf2image import convert_from_path | |
# Define your location list | |
location_list = pd.read_csv("us_cities_states_counties.csv", sep='|').reset_index(drop=True) | |
# Initialize the analyzers | |
batch_analyzer = BatchAnalyzerEngine() | |
analyzer = AnalyzerEngine() | |
image_analyzer = ImageAnalyzerEngine() | |
redactor = ImageRedactorEngine() | |
# Define patterns and recognizers | |
zip_pattern = Pattern(name="zip_pattern", regex='(\\b\\d{5}(?:\\-\\d{4})?\\b)', score=0.5) | |
zip_recognizer = PatternRecognizer(supported_entity="ZIPCODE", patterns=[zip_pattern], context=["zip", "zipcode"]) | |
batch_analyzer.registry.add_recognizer(zip_recognizer) | |
analyzer.registry.add_recognizer(zip_recognizer) | |
state_recognizer = PatternRecognizer(supported_entity="STATE", deny_list=list(location_list['State short'].dropna().unique()), context=["state", "address"]) | |
batch_analyzer.registry.add_recognizer(state_recognizer) | |
analyzer.registry.add_recognizer(state_recognizer) | |
city_recognizer = PatternRecognizer(supported_entity="CITY", deny_list=list(location_list['City'].dropna().unique()), context=["city", "address"]) | |
batch_analyzer.registry.add_recognizer(city_recognizer) | |
analyzer.registry.add_recognizer(city_recognizer) | |
password_pattern = Pattern(name="password_pattern", regex='^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9])(?=.*?[#?!@$%^&*-]).{8,}$', score=0.5) | |
password_recognizer = PatternRecognizer(supported_entity="PASSWORD", patterns=[password_pattern], context=["password"]) | |
batch_analyzer.registry.add_recognizer(password_recognizer) | |
analyzer.registry.add_recognizer(password_recognizer) | |
# Define a function to process different file types | |
def process_files(file_pattern, file_extension, analyze_func): | |
for filename in os.listdir('.'): | |
if fnmatch.fnmatch(filename, file_pattern): | |
if file_extension == 'csv': | |
data = pd.read_csv(filename, index_col=0).reset_index(drop=True) | |
elif file_extension == 'xlsx': | |
data = pd.read_excel(filename, engine='openpyxl') | |
elif file_extension == 'docx': | |
MY_TEXT = docx2txt.process(filename) | |
with open("pii_docx_made.txt", "w") as text_file: | |
print(MY_TEXT, file=text_file) | |
data = pd.read_csv("pii_docx_made.txt", sep="\t") | |
elif file_extension == 'pdf': | |
images = convert_from_path(filename) | |
data = [] | |
for i, img in enumerate(images): | |
img.save(f'made_{filename}{i}.jpg', 'JPEG') | |
data.append(f'made_{filename}{i}.jpg') | |
else: | |
continue | |
data = data.astype(str).replace('nan', pd.NA) | |
data_dict = data.to_dict(orient="list") | |
analyzer_results = analyze_func(data_dict, language="en") | |
analyzer_df = pd.DataFrame(analyzer_results) | |
presidio_df = pd.DataFrame(list(analyzer_df['recognizer_results']), analyzer_df['key']).reset_index() | |
presidio_df.insert(0, 'filename', filename, True) | |
presidio_df.to_csv(f"result_{file_extension}_{filename}.csv", index=False) | |
# Process text files | |
process_files('s_pii_*.txt', 'txt', batch_analyzer.analyze_dict) | |
# Process CSV files | |
process_files('s_pii_*.csv', 'csv', batch_analyzer.analyze_dict) | |
# Process XLSX files | |
process_files('s_pii_*.xlsx', 'xlsx', batch_analyzer.analyze_dict) | |
# Process DOCX files | |
process_files('s_pii_*.docx', 'docx', batch_analyzer.analyze_dict) | |
# Process PDF files | |
process_files('s_pii_*.pdf', 'pdf', batch_analyzer.analyze_dict) | |
# Process image files (JPG and PNG) | |
for image_extension in ['jpg', 'png']: | |
process_files(f's_pii_*.{image_extension}', image_extension, image_analyzer.analyze) | |
# Merge all results into a single CSV | |
result_files = [f for f in os.listdir('.') if f.startswith('result_')] | |
result_dfs = [pd.read_csv(f) for f in result_files] | |
final = pd.concat(result_dfs, ignore_index=True) | |
final.to_csv("result_structured.csv", index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment