Last active
April 23, 2026 11:55
-
-
Save fyxme/0758fc3e44ee79e1b2d8f8eb30ee1c96 to your computer and use it in GitHub Desktop.
mimtproxy script to replace media files with placeholders
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| mitmproxy addon: replace image / gif / video / audio responses with | |
| placeholders that match the original asset's pixel dimensions, and cache | |
| per-URL so repeat requests are short-circuited before hitting the origin. | |
| Run with: | |
| mitmdump -s placeholder.py | |
| mitmproxy -s placeholder.py | |
| mitmweb -s placeholder.py | |
| Requires Pillow for size-matched images: | |
| pip install Pillow | |
| Optional overrides: | |
| --set placeholder_image=/path/img.png (base image; stretched to match) | |
| --set placeholder_gif=/path/anim.gif | |
| --set placeholder_video=/path/clip.mp4 | |
| --set placeholder_audio=/path/beep.mp3 | |
| --set placeholder_cache_size=1024 (max cached URLs; LRU evicted) | |
| If Pillow is missing or the original's dimensions can't be read, the raw | |
| fallback/custom file bytes are served unchanged. | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import logging | |
| import mimetypes | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| from typing import NamedTuple, Optional | |
| from mitmproxy import ctx, http | |
| try: | |
| from PIL import Image | |
| HAVE_PIL = True | |
| except ImportError: | |
| HAVE_PIL = False | |
| log = logging.getLogger(__name__) | |
| # 1x1 gray PNG | |
| DEFAULT_PNG = base64.b64decode( | |
| b"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+A8AAQUBAScY42YAAAAASUVORK5CYII=" | |
| ) | |
| # 1x1 transparent GIF | |
| DEFAULT_GIF = base64.b64decode( | |
| b"R0lGODlhAQABAIAAAP///wAAACH5BAEAAAAALAAAAAABAAEAAAICRAEAOw==" | |
| ) | |
| class CachedResponse(NamedTuple): | |
| status: int | |
| content_type: str | |
| body: bytes | |
| class Placeholder: | |
| def __init__(self) -> None: | |
| self._cache: "OrderedDict[str, CachedResponse]" = OrderedDict() | |
| def load(self, loader) -> None: | |
| loader.add_option( | |
| name="placeholder_image", typespec=str, default="", | |
| help="Image file used as base for image/* responses (resized to match original).", | |
| ) | |
| loader.add_option( | |
| name="placeholder_gif", typespec=str, default="", | |
| help="Gif file used as base for image/gif responses.", | |
| ) | |
| loader.add_option( | |
| name="placeholder_video", typespec=str, default="", | |
| help="Video file for video/* responses. Empty = 204 No Content.", | |
| ) | |
| loader.add_option( | |
| name="placeholder_audio", typespec=str, default="", | |
| help="Audio file for audio/* responses. Empty = 204 No Content.", | |
| ) | |
| loader.add_option( | |
| name="placeholder_cache_size", typespec=int, default=1024, | |
| help="Max number of cached placeholder responses (LRU).", | |
| ) | |
| # Short-circuit cached URLs before they touch the network. | |
| def request(self, flow: http.HTTPFlow) -> None: | |
| key = flow.request.url | |
| cached = self._cache.get(key) | |
| if cached is None: | |
| return | |
| self._cache.move_to_end(key) | |
| if cached.status == 204: | |
| flow.response = http.Response.make( | |
| 204, b"", {"cache-control": "no-store"}, | |
| ) | |
| else: | |
| flow.response = http.Response.make( | |
| cached.status, cached.body, | |
| { | |
| "content-type": cached.content_type, | |
| "content-length": str(len(cached.body)), | |
| "cache-control": "no-store", | |
| }, | |
| ) | |
| def response(self, flow: http.HTTPFlow) -> None: | |
| if flow.response is None: | |
| return | |
| key = flow.request.url | |
| # If the request hook already served from cache, leave it alone. | |
| if key in self._cache: | |
| return | |
| ctype = (flow.response.headers.get("content-type", "") | |
| .split(";", 1)[0].strip().lower()) | |
| if not ctype: | |
| guess, _ = mimetypes.guess_type(flow.request.pretty_url) | |
| ctype = (guess or "").lower() | |
| orig_body = flow.response.content or b"" | |
| if ctype == "image/gif": | |
| body, out_ct = self._image_placeholder( | |
| orig_body, ctx.options.placeholder_gif, DEFAULT_GIF, "GIF", | |
| ) | |
| self._apply(flow, body, out_ct) | |
| self._remember(key, CachedResponse(200, out_ct, body)) | |
| elif ctype.startswith("image/"): | |
| body, out_ct = self._image_placeholder( | |
| orig_body, ctx.options.placeholder_image, DEFAULT_PNG, "PNG", | |
| ) | |
| self._apply(flow, body, out_ct) | |
| self._remember(key, CachedResponse(200, out_ct, body)) | |
| elif ctype.startswith("video/"): | |
| if ctx.options.placeholder_video: | |
| body, out_ct = self._load_media( | |
| ctx.options.placeholder_video, b"", "video/mp4", | |
| ) | |
| self._apply(flow, body, out_ct) | |
| self._remember(key, CachedResponse(200, out_ct, body)) | |
| else: | |
| self._no_content(flow) | |
| self._remember(key, CachedResponse(204, "", b"")) | |
| elif ctype.startswith("audio/"): | |
| if ctx.options.placeholder_audio: | |
| body, out_ct = self._load_media( | |
| ctx.options.placeholder_audio, b"", "audio/mpeg", | |
| ) | |
| self._apply(flow, body, out_ct) | |
| self._remember(key, CachedResponse(200, out_ct, body)) | |
| else: | |
| self._no_content(flow) | |
| self._remember(key, CachedResponse(204, "", b"")) | |
| # ---- image generation ---- | |
| def _image_placeholder( | |
| self, | |
| orig: bytes, | |
| custom_path: str, | |
| fallback: bytes, | |
| fmt: str, | |
| ) -> tuple[bytes, str]: | |
| out_ctype = {"PNG": "image/png", "GIF": "image/gif"}[fmt] | |
| dims = self._dims(orig) | |
| # Can't read dims or no Pillow: serve the raw placeholder bytes. | |
| if dims is None or not HAVE_PIL: | |
| if custom_path: | |
| p = Path(custom_path).expanduser() | |
| if p.is_file(): | |
| guess, _ = mimetypes.guess_type(str(p)) | |
| return p.read_bytes(), guess or out_ctype | |
| return fallback, out_ctype | |
| base = self._open_base(custom_path, fallback) | |
| if base is None: | |
| return fallback, out_ctype | |
| try: | |
| resized = base.resize(dims) | |
| if fmt == "GIF": | |
| resized = resized.convert("P") | |
| else: | |
| resized = resized.convert("RGBA") | |
| buf = io.BytesIO() | |
| resized.save(buf, format=fmt) | |
| return buf.getvalue(), out_ctype | |
| except Exception as e: | |
| log.warning("resize failed (%s); falling back to raw bytes", e) | |
| return fallback, out_ctype | |
| def _open_base(self, custom_path: str, fallback: bytes) -> Optional["Image.Image"]: | |
| if custom_path: | |
| p = Path(custom_path).expanduser() | |
| if p.is_file(): | |
| try: | |
| return Image.open(p).copy() | |
| except Exception as e: | |
| log.warning("can't open %s: %s", custom_path, e) | |
| try: | |
| return Image.open(io.BytesIO(fallback)).copy() | |
| except Exception as e: | |
| log.warning("can't open fallback image: %s", e) | |
| return None | |
| def _dims(self, data: bytes) -> Optional[tuple[int, int]]: | |
| if not HAVE_PIL or not data: | |
| return None | |
| try: | |
| with Image.open(io.BytesIO(data)) as im: | |
| return im.size | |
| except Exception: | |
| return None | |
| # ---- media / response helpers ---- | |
| def _load_media(self, path: str, fallback: bytes, fallback_ct: str) -> tuple[bytes, str]: | |
| p = Path(path).expanduser() | |
| if p.is_file(): | |
| guess, _ = mimetypes.guess_type(str(p)) | |
| return p.read_bytes(), guess or fallback_ct | |
| log.warning("placeholder file not found: %s", path) | |
| return fallback, fallback_ct | |
| def _apply(self, flow: http.HTTPFlow, body: bytes, ctype: str) -> None: | |
| resp = flow.response | |
| resp.status_code = 200 | |
| resp.reason = "OK" | |
| for h in ("content-encoding", "transfer-encoding", "content-range"): | |
| resp.headers.pop(h, None) | |
| resp.content = body | |
| resp.headers["content-type"] = ctype | |
| resp.headers["content-length"] = str(len(body)) | |
| resp.headers["cache-control"] = "no-store" | |
| def _no_content(self, flow: http.HTTPFlow) -> None: | |
| resp = flow.response | |
| resp.status_code = 204 | |
| resp.reason = "No Content" | |
| resp.content = b"" | |
| for h in ("content-type", "content-encoding", | |
| "transfer-encoding", "content-range", "content-length"): | |
| resp.headers.pop(h, None) | |
| resp.headers["cache-control"] = "no-store" | |
| # ---- cache ---- | |
| def _remember(self, key: str, entry: CachedResponse) -> None: | |
| self._cache[key] = entry | |
| self._cache.move_to_end(key) | |
| limit = max(0, int(ctx.options.placeholder_cache_size)) | |
| while len(self._cache) > limit: | |
| self._cache.popitem(last=False) | |
| addons = [Placeholder()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment