Created
January 30, 2025 04:28
-
-
Save rahimnathwani/25c8d7689c21413b26b7a0e5de4a5bde to your computer and use it in GitHub Desktop.
Label facial images using deepface
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run --script | |
# /// script | |
# requires-python = ">=3.12.2" | |
# dependencies = [ | |
# "deepface", | |
# "pandas==2.2.3", | |
# "tqdm==4.67.1", | |
# "opencv-contrib-python==4.10.0.84", | |
# "tensorflow==2.18.0", | |
# "keras==3.7.0", | |
# "gdown==5.2.0", | |
# "tf-keras" | |
# ] | |
# /// | |
import os | |
import sys | |
from deepface import DeepFace | |
import pandas as pd | |
from decimal import Decimal | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor | |
from concurrent.futures import as_completed | |
from itertools import product | |
import time | |
import pickle | |
import argparse | |
backends = [ | |
"opencv", | |
"yunet", | |
"retinaface", | |
# "ssd", | |
# "dlib", | |
# "mtcnn", | |
# "fastmtcnn", | |
# "mediapipe", | |
# "yolov8", | |
# "centerface", | |
] | |
MAX_FILE_COUNT = 100000000 | |
PERMITTED_EXTENSIONS = [".jpg", ".png"] | |
alignment_modes = [True, False] | |
def flatten_dict(d, parent_key="", sep="_"): | |
items = [] | |
for k, v in d.items(): | |
new_key = f"{parent_key}{sep}{k}" if parent_key else k | |
if isinstance(v, dict): | |
items.extend(flatten_dict(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
return dict(items) | |
def format_dict(l): | |
if isinstance(l, list): | |
d = flatten_dict(l[0]) | |
else: | |
return "" | |
return "\n".join(f"{k}: {v}" for k, v in d.items()) | |
def save_results_to_csv_and_pickle(results, output_dir, seq=0): | |
results_flattened = {k: flatten_dict(v) for k, v in results.items()} | |
for _, results in tqdm(results_flattened.items()): | |
for key, value in results.items(): | |
if isinstance(value, (float, Decimal)): | |
results[key] = round(float(value), 2) | |
# Add timestamp to filename to prevent overwriting | |
timestamp = int(time.time()) | |
output_file = f"{output_dir}_results_{timestamp}_{seq}.csv" | |
pd.DataFrame(results_flattened).T.to_csv(output_file) | |
output_file = f"{output_dir}_results_{timestamp}_{seq}.pkl" | |
with open(output_file, "wb") as f: | |
pickle.dump(results_flattened, f) | |
return output_file | |
def analyze_image(args): | |
file, backend = args | |
try: | |
result = DeepFace.analyze(img_path=f"{DIR}/{file}", detector_backend=backend, silent=True)[ | |
0 | |
] # only looking at the first face returned | |
except Exception as e: | |
result = {} | |
return (file, backend, result) | |
def main(directory): | |
files = [ | |
f for f in os.listdir(directory) if any(f.endswith(ext) for ext in PERMITTED_EXTENSIONS) | |
][:MAX_FILE_COUNT] | |
# Create list of all combinations | |
jobs = list(product(files, backends)) | |
# Process in parallel with progress bar | |
results = {} | |
# Create a dictionary to store results for the current batch | |
batch_results = {} | |
batch_size = 1000 | |
def analyze_image_with_dir(args): | |
file, backend = args | |
try: | |
result = DeepFace.analyze( | |
img_path=f"{directory}/{file}", detector_backend=backend, silent=True | |
)[ | |
0 | |
] # only looking at the first face returned | |
except Exception as e: | |
result = {} | |
return (file, backend, result) | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
futures = [executor.submit(analyze_image_with_dir, job) for job in jobs] | |
# Use tqdm to show progress | |
for i, future in enumerate(tqdm(as_completed(futures), total=len(jobs))): | |
file, backend, result = future.result() | |
if file not in results: | |
results[file] = {} | |
results[file][backend] = result | |
# Save results when batch is full | |
if (i + 1) % batch_size == 0: | |
# Save only the new results since last save | |
batch_to_save = {k: v for k, v in results.items() if k not in batch_results} | |
if batch_to_save: | |
save_results_to_csv_and_pickle(batch_to_save, directory, i) | |
batch_results.update(batch_to_save) | |
# Save any remaining results at the end | |
final_batch = {k: v for k, v in results.items() if k not in batch_results} | |
if final_batch: | |
save_results_to_csv_and_pickle(final_batch, directory, len(jobs)) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Analyze faces in images using DeepFace.") | |
parser.add_argument("directory", help="Directory containing images to analyze") | |
args = parser.parse_args() | |
if not os.path.isdir(args.directory): | |
print(f"Error: '{args.directory}' is not a valid directory", file=sys.stderr) | |
sys.exit(1) | |
main(args.directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment