Last active
September 27, 2025 00:14
-
-
Save carpedm20/872d556d1bd924be9b74fa4f60224377 to your computer and use it in GitHub Desktop.
head_reducer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# {ai/scripts/head/head_reducer.py} | |
""" | |
Head size adjustment based on head-to-body ratio | |
Pipeline | |
1) Person pose with YOLOv8 to estimate full body height in pixels | |
2) Head box with MediaPipe Face Detection | |
3) Compute shrink/enlarge factor from target head count (or ratio) | |
4) Apply smooth elliptical radial warp | |
""" | |
import argparse | |
from typing import Optional, Tuple | |
import cv2 | |
import mediapipe as mp | |
import numpy as np | |
from loguru import logger | |
from ultralytics import YOLO | |
# ------------------------------- | |
# Detection utilities | |
# ------------------------------- | |
def detect_face_bbox_mp( | |
img_bgr: np.ndarray, | |
) -> Optional[Tuple[int, int, int, int, float]]: | |
"""Detect a face with MediaPipe Face Detection. Returns (x, y, w, h, score).""" | |
h, w = img_bgr.shape[:2] | |
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
with mp.solutions.face_detection.FaceDetection( | |
model_selection=1, min_detection_confidence=0.5 | |
) as fd: | |
res = fd.process(img_rgb) | |
if not res.detections: | |
logger.warning("No face detected by MediaPipe.") | |
return None | |
best = max(res.detections, key=lambda d: d.score[0] if d.score else 0.0) | |
rel = best.location_data.relative_bounding_box | |
x = int(rel.xmin * w) | |
y = int(rel.ymin * h) | |
bw = int(rel.width * w) | |
bh = int(rel.height * h) | |
x = max(0, x) | |
y = max(0, y) | |
bw = max(1, min(bw, w - x)) | |
bh = max(1, min(bh, h - y)) | |
score = float(best.score[0]) if best.score else 0.0 | |
logger.info( | |
"Face bbox xywh=({}, {}, {}, {}), score={:.3f}", x, y, bw, bh, score | |
) | |
return x, y, bw, bh, score | |
def run_yolo_pose(img_bgr: np.ndarray, model: Optional[YOLO] = None): | |
"""Runs YOLOv8 pose. Returns (boxes_xyxy, kpts_xy, kpts_conf).""" | |
if model is None: | |
logger.info("Loading YOLOv8n pose model...") | |
model = YOLO("yolov8n-pose.pt") | |
results = model(img_bgr, conf=0.35, iou=0.45, verbose=False) | |
r0 = results[0] | |
if r0.keypoints is None or len(r0.keypoints.xy) == 0: | |
logger.warning("YOLO pose found no keypoints.") | |
return None, None, None | |
boxes = ( | |
r0.boxes.xyxy.cpu().numpy().astype(np.float32) if r0.boxes is not None else None | |
) | |
kpts_xy = r0.keypoints.xy.cpu().numpy().astype(np.float32) | |
kpts_conf = ( | |
r0.keypoints.conf.cpu().numpy().astype(np.float32) | |
if r0.keypoints.conf is not None | |
else None | |
) | |
if boxes is None or len(boxes) == 0: | |
logger.warning("YOLO pose found no boxes.") | |
return None, None, None | |
logger.info("YOLO pose persons detected: {}", len(boxes)) | |
return boxes, kpts_xy, kpts_conf | |
def choose_person_for_face(face_box, boxes_xyxy): | |
"""Pick the person whose box overlaps the face box the most.""" | |
fx, fy, fw, fh = face_box | |
f_xyxy = np.array([fx, fy, fx + fw, fy + fh], dtype=np.float32) | |
def iou(a, b): | |
ax1, ay1, ax2, ay2 = a | |
bx1, by1, bx2, by2 = b | |
ix1, iy1 = max(ax1, bx1), max(ay1, by1) | |
ix2, iy2 = min(ax2, bx2), min(ay2, by2) | |
iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) | |
inter = iw * ih | |
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) | |
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) | |
return inter / (area_a + area_b - inter + 1e-6) | |
ious = [iou(f_xyxy, b) for b in boxes_xyxy] | |
best = int(np.argmax(ious)) | |
logger.info("Chose person index {} with IoU {:.3f}", best, ious[best]) | |
return best | |
def estimate_body_height(face_box, boxes_xyxy, kpts_xy, kpts_conf): | |
"""Estimate body height from head top to ankles/knees/hips.""" | |
x, y, w, h = face_box | |
head_top = max(0, y - int(0.15 * h)) | |
idx = choose_person_for_face(face_box, boxes_xyxy) | |
box = boxes_xyxy[idx] | |
k = kpts_xy[idx] | |
conf = kpts_conf[idx] if kpts_conf is not None else np.ones((17,), np.float32) | |
def max_y(indices): | |
vals = [k[i, 1] for i in indices if i < k.shape[0] and conf[i] > 0.25] | |
return max(vals) if vals else None | |
bottom = max_y([15, 16]) or max_y([13, 14]) or max_y([11, 12]) or box[3] | |
height = float(bottom - head_top) | |
logger.info("Estimated body height = {:.1f}px", height) | |
return height | |
def estimate_head(face_box): | |
"""Expand face box height a bit for hair.""" | |
_, _, _, fh = face_box | |
head_px = fh * 1.2 | |
logger.info("Estimated head height = {:.1f}px", head_px) | |
return head_px | |
# ------------------------------- | |
# Warp utilities | |
# ------------------------------- | |
def elliptical_head_warp(img, center, rx, ry, factor): | |
"""Apply smooth warp. factor <1 shrinks, >1 enlarges.""" | |
assert 0.5 <= factor <= 2.0, "factor must be in [0.5, 2.0]" | |
h, w = img.shape[:2] | |
cx, cy = center | |
yy, xx = np.meshgrid( | |
np.arange(h, dtype=np.float32), np.arange(w, dtype=np.float32), indexing="ij" | |
) | |
dx, dy = xx - cx, yy - cy | |
dnorm = np.sqrt((dx / (rx + 1e-8)) ** 2 + (dy / (ry + 1e-8)) ** 2) | |
inside = dnorm <= 1.0 | |
map_x, map_y = xx.copy(), yy.copy() | |
if np.any(inside): | |
r_out = dnorm[inside] | |
s = factor | |
a, b = 1.0 - s, s | |
if abs(a) < 1e-6: | |
r_in = r_out | |
else: | |
disc = b * b + 4 * a * r_out | |
r_in = (-b + np.sqrt(disc)) / (2 * a) | |
r_in = np.clip(r_in, 0.0, 1.0) | |
scale = np.zeros_like(r_out) | |
nz = r_out > 1e-7 | |
scale[nz] = r_in[nz] / r_out[nz] | |
dx_in, dy_in = dx[inside] * scale, dy[inside] * scale | |
map_x[inside], map_y[inside] = cx + dx_in, cy + dy_in | |
warped = cv2.remap( | |
img, | |
map_x, | |
map_y, | |
interpolation=cv2.INTER_LINEAR, | |
borderMode=cv2.BORDER_REPLICATE, | |
) | |
return warped | |
# ------------------------------- | |
# Main pipeline | |
# ------------------------------- | |
def run_pipeline( | |
image_path: str, | |
output_path: str, | |
target_heads: Optional[int] = None, | |
target_head_ratio: float = 0.125, | |
shrink_override: Optional[float] = None, | |
ellipse_scale_x: float = 1.25, | |
ellipse_scale_y: float = 1.6, | |
debug: bool = False, | |
): | |
logger.info("Reading image {}", image_path) | |
img = cv2.imread(image_path, cv2.IMREAD_COLOR) | |
if img is None: | |
raise FileNotFoundError(image_path) | |
face = detect_face_bbox_mp(img) | |
if face is None: | |
raise RuntimeError("No face detected") | |
fx, fy, fw, fh, _ = face | |
boxes, kpts, conf = run_yolo_pose(img) | |
if boxes is None: | |
raise RuntimeError("No person detected") | |
height_px = estimate_body_height(face[:4], boxes, kpts, conf) | |
head_px = estimate_head(face[:4]) | |
ratio_now = head_px / max(1e-6, height_px) | |
if shrink_override is not None: | |
shrink = shrink_override | |
reason = "override" | |
else: | |
if target_heads: | |
target_head_ratio = 1.0 / float(target_heads) | |
reason = f"target_heads={target_heads}" | |
else: | |
reason = "target_head_ratio" | |
raw = target_head_ratio / ratio_now | |
shrink = raw | |
pre_clamp = shrink | |
shrink = float(np.clip(shrink, 0.5, 2.0)) | |
logger.info( | |
"Metrics: height={:.1f} head={:.1f} ratio_now={:.3f} target_ratio={:.3f}", | |
height_px, | |
head_px, | |
ratio_now, | |
target_head_ratio, | |
) | |
logger.info( | |
"Shrink factor {} = {:.3f} (clamped to {:.3f})", reason, pre_clamp, shrink | |
) | |
cx, cy = fx + fw * 0.5, fy + fh * 0.5 | |
rx, ry = fw * ellipse_scale_x * 0.5, fh * ellipse_scale_y * 0.5 | |
warped = elliptical_head_warp(img, (cx, cy), rx, ry, shrink) | |
if debug: | |
cv2.rectangle(warped, (fx, fy), (fx + fw, fy + fh), (0, 255, 0), 2) | |
cv2.ellipse( | |
warped, (int(cx), int(cy)), (int(rx), int(ry)), 0, 0, 360, (255, 0, 0), 2 | |
) | |
txt = f"{height_px:.1f}px/{head_px:.1f}px ratio={ratio_now:.3f} shrink={shrink:.3f}" | |
cv2.putText( | |
warped, | |
txt, | |
(10, max(30, fy - 10)), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
0.7, | |
(50, 50, 255), | |
2, | |
) | |
cv2.imwrite(output_path, warped) | |
logger.info("Saved result to {}", output_path) | |
def main(): | |
p = argparse.ArgumentParser() | |
p.add_argument("--input", required=True) | |
p.add_argument("--output", required=True) | |
p.add_argument( | |
"--target_heads", | |
type=int, | |
default=None, | |
help="Desired body proportion in heads (7,8,9)", | |
) | |
p.add_argument("--target_head_ratio", type=float, default=0.125) | |
p.add_argument( | |
"--shrink_override", | |
type=float, | |
default=None, | |
help="Manual shrink/enlarge factor (0.5–2.0)", | |
) | |
p.add_argument("--ellipse_scale_x", type=float, default=1.25) | |
p.add_argument("--ellipse_scale_y", type=float, default=1.6) | |
p.add_argument("--debug", action="store_true") | |
p.add_argument("--log_level", default="INFO") | |
args = p.parse_args() | |
logger.remove() | |
logger.add(lambda m: print(m, end=""), level=args.log_level) | |
run_pipeline( | |
args.input, | |
args.output, | |
target_heads=args.target_heads, | |
target_head_ratio=args.target_head_ratio, | |
shrink_override=args.shrink_override, | |
ellipse_scale_x=args.ellipse_scale_x, | |
ellipse_scale_y=args.ellipse_scale_y, | |
debug=args.debug, | |
) | |
if __name__ == "__main__": | |
""" | |
docker compose run ai \ | |
python -m scripts.head.head_reducer \ | |
--input ./assets/fullbody/taehoon12.png \ | |
--output out.jpg --target_heads 6 | |
""" | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment