Skip to content

Instantly share code, notes, and snippets.

@rahimnathwani
Created January 30, 2025 04:28
Show Gist options
  • Save rahimnathwani/25c8d7689c21413b26b7a0e5de4a5bde to your computer and use it in GitHub Desktop.
Save rahimnathwani/25c8d7689c21413b26b7a0e5de4a5bde to your computer and use it in GitHub Desktop.
Label facial images using deepface
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12.2"
# dependencies = [
# "deepface",
# "pandas==2.2.3",
# "tqdm==4.67.1",
# "opencv-contrib-python==4.10.0.84",
# "tensorflow==2.18.0",
# "keras==3.7.0",
# "gdown==5.2.0",
# "tf-keras"
# ]
# ///
import os
import sys
from deepface import DeepFace
import pandas as pd
from decimal import Decimal
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from itertools import product
import time
import pickle
import argparse
backends = [
"opencv",
"yunet",
"retinaface",
# "ssd",
# "dlib",
# "mtcnn",
# "fastmtcnn",
# "mediapipe",
# "yolov8",
# "centerface",
]
MAX_FILE_COUNT = 100000000
PERMITTED_EXTENSIONS = [".jpg", ".png"]
alignment_modes = [True, False]
def flatten_dict(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def format_dict(l):
if isinstance(l, list):
d = flatten_dict(l[0])
else:
return ""
return "\n".join(f"{k}: {v}" for k, v in d.items())
def save_results_to_csv_and_pickle(results, output_dir, seq=0):
results_flattened = {k: flatten_dict(v) for k, v in results.items()}
for _, results in tqdm(results_flattened.items()):
for key, value in results.items():
if isinstance(value, (float, Decimal)):
results[key] = round(float(value), 2)
# Add timestamp to filename to prevent overwriting
timestamp = int(time.time())
output_file = f"{output_dir}_results_{timestamp}_{seq}.csv"
pd.DataFrame(results_flattened).T.to_csv(output_file)
output_file = f"{output_dir}_results_{timestamp}_{seq}.pkl"
with open(output_file, "wb") as f:
pickle.dump(results_flattened, f)
return output_file
def analyze_image(args):
file, backend = args
try:
result = DeepFace.analyze(img_path=f"{DIR}/{file}", detector_backend=backend, silent=True)[
0
] # only looking at the first face returned
except Exception as e:
result = {}
return (file, backend, result)
def main(directory):
files = [
f for f in os.listdir(directory) if any(f.endswith(ext) for ext in PERMITTED_EXTENSIONS)
][:MAX_FILE_COUNT]
# Create list of all combinations
jobs = list(product(files, backends))
# Process in parallel with progress bar
results = {}
# Create a dictionary to store results for the current batch
batch_results = {}
batch_size = 1000
def analyze_image_with_dir(args):
file, backend = args
try:
result = DeepFace.analyze(
img_path=f"{directory}/{file}", detector_backend=backend, silent=True
)[
0
] # only looking at the first face returned
except Exception as e:
result = {}
return (file, backend, result)
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(analyze_image_with_dir, job) for job in jobs]
# Use tqdm to show progress
for i, future in enumerate(tqdm(as_completed(futures), total=len(jobs))):
file, backend, result = future.result()
if file not in results:
results[file] = {}
results[file][backend] = result
# Save results when batch is full
if (i + 1) % batch_size == 0:
# Save only the new results since last save
batch_to_save = {k: v for k, v in results.items() if k not in batch_results}
if batch_to_save:
save_results_to_csv_and_pickle(batch_to_save, directory, i)
batch_results.update(batch_to_save)
# Save any remaining results at the end
final_batch = {k: v for k, v in results.items() if k not in batch_results}
if final_batch:
save_results_to_csv_and_pickle(final_batch, directory, len(jobs))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Analyze faces in images using DeepFace.")
parser.add_argument("directory", help="Directory containing images to analyze")
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: '{args.directory}' is not a valid directory", file=sys.stderr)
sys.exit(1)
main(args.directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment