Last active
November 4, 2025 04:29
-
-
Save UuuNyaa/ec1fccf727988259844e03a4ca2a4909 to your computer and use it in GitHub Desktop.
Batch processing for stable-diffusion-webui API with sd-webui-controlnet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import base64 | |
| import collections.abc | |
| import glob | |
| import io | |
| import json | |
| import math | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import lru_cache | |
| from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar | |
| import numpy as np | |
| import requests | |
| from PIL import Image, ImageFilter | |
| def update_list(target: List, update: List) -> List: | |
| if len(target) < len(update): | |
| target.extend([None] * (len(update) - len(target))) | |
| for i in range(len(target)): | |
| if isinstance(update[i], collections.abc.Mapping): | |
| target[i] = update_dict(target[i], update[i]) | |
| elif isinstance(update[i], List): | |
| target[i] = update_list(target[i], update[i]) | |
| else: | |
| target[i] = update[i] | |
| return target | |
| def update_dict(target: Dict, update: Dict) -> Dict: | |
| for k, v in update.items(): | |
| if isinstance(v, collections.abc.Mapping): | |
| target[k] = update_dict(target.get(k, {}), v) | |
| elif isinstance(v, List): | |
| target[k] = update_list(target[k], v) | |
| else: | |
| target[k] = v | |
| return target | |
| def snip_long_str(obj: Any) -> Any: | |
| if isinstance(obj, collections.abc.Mapping): | |
| return {snip_long_str(k): snip_long_str(v) for k, v in obj.items()} | |
| elif isinstance(obj, str): | |
| if len(obj) > 64: | |
| return obj[:64] | |
| elif hasattr(obj, '__iter__'): | |
| return [snip_long_str(e) for e in obj] | |
| return obj | |
| def update_parameters(target: Dict, update: Dict): | |
| return update_dict(target, update) | |
| def b64encode_image(image: Image.Image) -> str: | |
| output = io.BytesIO() | |
| image.save(output, format='png') | |
| return base64.b64encode(output.getvalue()).decode('utf-8') | |
| def b64decode_image(b64_image: str) -> Image.Image: | |
| return Image.open(io.BytesIO(base64.b64decode(b64_image))) | |
| T = TypeVar('T') | |
| def sliding_window(elements: List[T], window_size: int) -> Iterable[List[T]]: | |
| elemet_length = len(elements) | |
| if elemet_length < window_size: | |
| return elements | |
| if elemet_length == window_size: | |
| yield elements | |
| return | |
| for i in range(elemet_length - window_size + 1): | |
| yield elements[i:i+window_size] | |
| @lru_cache(maxsize=16) | |
| def load_image(image_path: str, size: Tuple[int, int]) -> Image.Image: | |
| return Image.open(image_path).resize(size).convert('RGB') | |
| def blend_images(images: Iterable[Image.Image], factors: Iterable[float]) -> Image.Image: | |
| output: Optional[np.ndarray] = None | |
| for image, factor in zip(images, factors): | |
| if math.isclose(factor, 0.0): | |
| continue | |
| if math.isclose(factor, 1.0): | |
| return image | |
| if output is None: | |
| output = factor * np.asarray(image) | |
| else: | |
| output += factor * np.asarray(image) | |
| return Image.fromarray(output.astype(np.uint8)) | |
| DEFAULT_PARAMETERS = { | |
| 'init_images': [], | |
| 'denoising_strength': 0.7, | |
| 'prompt': "", | |
| 'negative_prompt': "", | |
| 'alwayson_scripts': { | |
| 'ControlNet': { | |
| 'args': [{ | |
| 'input_image': None, | |
| 'mask': None, | |
| 'module': 'none', | |
| 'model': 'None', | |
| 'weight': 1, | |
| 'resize_mode': 'Scale to Fit (Inner Fit)', | |
| 'lowvram': False, | |
| 'processor_res': 64, | |
| 'threshold_a': 64, | |
| 'threshold_b': 64, | |
| 'guidance': 1, | |
| 'guidance_start': 0, | |
| 'guidance_end': 1, | |
| 'guessmode': False, | |
| }], | |
| } | |
| }, | |
| 'seed': 0, | |
| 'sampler_index': "DPM++ SDE Karras", | |
| 'batch_size': 1, | |
| 'n_iter': 1, | |
| 'steps': 15, | |
| 'cfg_scale': 7, | |
| 'width': 512, | |
| 'height': 512, | |
| } | |
| def main( | |
| init_image_paths: List[str], | |
| init_image_blend_window_factors: List[float], | |
| controlnet_input_image_paths: List[str], | |
| controlnet_input_image_blend_window_factors: List[float], | |
| output_path: str, | |
| output_image_brend_factors: List[float], | |
| overwrite: bool = False, | |
| mask_image_paths: Optional[List[str]] = None, | |
| mask_image_blend_window_factors: Optional[List[float]] = None, | |
| override_parameters: Optional[Dict[str, Any]] = None, | |
| max_workers=1, | |
| api_url='http://127.0.0.1:7860/sdapi/v1/img2img', | |
| ): | |
| if len(init_image_paths) != len(controlnet_input_image_paths): | |
| raise ValueError("Size mismatch: len(init_image_paths) != len(controlnet_input_image_paths)") | |
| has_mask = mask_image_paths is not None | |
| if has_mask and len(init_image_paths) != len(mask_image_paths): | |
| raise ValueError("Size mismatch: len(init_image_paths) != len(mask_image_paths)") | |
| if mask_image_blend_window_factors is None: | |
| mask_image_blend_window_factors = [1] | |
| if not os.path.exists(output_path): | |
| os.mkdir(output_path) | |
| if override_parameters is None: | |
| override_parameters = {} | |
| parameters = update_parameters(DEFAULT_PARAMETERS.copy(), override_parameters) | |
| print(parameters) | |
| image_size = (parameters['width'], parameters['height']) | |
| batch_size = len(output_image_brend_factors) | |
| output_image_brend_factors /= np.sum(output_image_brend_factors) | |
| init_image_blend_window_factors /= np.sum(init_image_blend_window_factors) | |
| init_image_blend_window_size = len(init_image_blend_window_factors) | |
| init_image_blend_window_center_index = init_image_blend_window_size // 2 | |
| mask_image_blend_window_factors /= np.sum(mask_image_blend_window_factors) | |
| mask_image_blend_window_size = len(mask_image_blend_window_factors) | |
| controlnet_input_image_blend_window_factors /= np.sum(controlnet_input_image_blend_window_factors) | |
| controlnet_input_image_blend_window_size = len(controlnet_input_image_blend_window_factors) | |
| def load_and_blend_window(image_paths: List[str], blend_window_factors) -> Image.Image: | |
| return blend_images((load_image(x, image_size) for x in image_paths), blend_window_factors) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| def process(init_image_window: List[str], controlnet_input_image_window: List[str], mask_image_window: List[str]): | |
| init_image_filename, _extension = os.path.splitext(os.path.basename(init_image_window[init_image_blend_window_center_index])) | |
| output_image_path = os.path.join(output_path, f"{init_image_filename}.png") | |
| if not overwrite and os.path.exists(output_image_path): | |
| print(f"skip: {init_image_filename}") | |
| return | |
| init_blend_image = load_and_blend_window(init_image_window, init_image_blend_window_factors) | |
| mask_blend_image = load_and_blend_window(mask_image_window, mask_image_blend_window_factors) if has_mask else None | |
| controlnet_input_blend_image = load_and_blend_window(controlnet_input_image_window, controlnet_input_image_blend_window_factors) | |
| print(f"requset: {init_image_filename}") | |
| update_parameters(parameters, { | |
| 'init_images': [b64encode_image(init_blend_image)], | |
| 'mask': b64encode_image(mask_blend_image) if has_mask else None, | |
| 'alwayson_scripts': { | |
| 'ControlNet': { | |
| 'args': [{ | |
| 'input_image': b64encode_image(controlnet_input_blend_image), | |
| }] | |
| } | |
| }, | |
| 'batch_size': batch_size, | |
| }) | |
| response = requests.post(api_url, data=json.dumps(parameters)) | |
| if not response.ok: | |
| print(f"ERROR: {response.status_code}, {response.text}") | |
| return | |
| output_image = blend_images((b64decode_image(b64_image) for b64_image in response.json()['images']), output_image_brend_factors) | |
| output_image.filter(ImageFilter.EDGE_ENHANCE).filter(ImageFilter.SMOOTH) | |
| output_image.save(os.path.join(output_path, f"{init_image_filename}.png")) | |
| if has_mask: | |
| for init_image_window, controlnet_input_image_window, mask_image_window in zip(sliding_window(init_image_paths, init_image_blend_window_size), sliding_window(controlnet_input_image_paths, controlnet_input_image_blend_window_size), sliding_window(mask_image_paths, mask_image_blend_window_size)): | |
| executor.submit(process, init_image_window, controlnet_input_image_window, mask_image_window) | |
| else: | |
| for init_image_window, controlnet_input_image_window in zip(sliding_window(init_image_paths, init_image_blend_window_size), sliding_window(controlnet_input_image_paths, controlnet_input_image_blend_window_size)): | |
| executor.submit(process, init_image_window, controlnet_input_image_window, []) | |
| if __name__ == '__main__': | |
| main( | |
| init_image_paths=sorted(glob.glob("render-color/*.png")), | |
| controlnet_input_image_paths=sorted(glob.glob("render-line/*.png")), | |
| init_image_blend_window_factors=[1], | |
| controlnet_input_image_blend_window_factors=[1], | |
| output_image_brend_factors=[1], | |
| output_path="blend-output/", | |
| # overwrite=True, | |
| override_parameters={ | |
| 'prompt': "(masterpiece, best quality:1.3), (beautiful 1girl:1.2), (kemono friends), 16yo, pink eyes, aqua very long hair , heart shaped pupils, white dress, happy, reflection floor, simple white background", | |
| 'negative_prompt': "EasyNegative, nsfw, (worst quality, low quality:1.4), letters, numbers, (lip, nose, tooth, rouge, lipstick, eyeshadow:1.4), (blush:1.2), (jpeg artifacts:1.4), (1boy, abs, muscular, rib:1.0), greyscale, monochrome, dusty sunbeams, trembling, motion lines, motion blur, emphasis lines, text, title, logo, signature", | |
| 'width': int(512*1), | |
| 'height': int(512*1), | |
| 'steps': 15, | |
| 'denoising_strength': 0.4, | |
| 'cfg_scale': 8, | |
| 'alwayson_scripts': { | |
| 'ControlNet': { | |
| 'args': [{ | |
| 'processor_res': int(512*1), | |
| 'weight': 1.0, | |
| 'guidance_start': 0.0, | |
| 'guidance_end': 1.0, | |
| 'module': 'none', | |
| 'model': 'control_sd15_canny [fef5e48e]', | |
| }] | |
| } | |
| }, | |
| }, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Woah, thanks so much for this code mate, it's really super helpfull!!
I had a question tho, how about adding a "reference" controlnet on top of that?
I'm super weak in term of API and I dont really understand how you feed in so well the input image sequence and the canny image sequence, so I have a hard time replicating the reference one
I tried something like that :
But obviously it doesnt work
Any change to have your help good sir? :)