Skip to content

Instantly share code, notes, and snippets.

@UuuNyaa
Last active November 4, 2025 04:29
Show Gist options
  • Save UuuNyaa/ec1fccf727988259844e03a4ca2a4909 to your computer and use it in GitHub Desktop.
Save UuuNyaa/ec1fccf727988259844e03a4ca2a4909 to your computer and use it in GitHub Desktop.
Batch processing for stable-diffusion-webui API with sd-webui-controlnet
import base64
import collections.abc
import glob
import io
import json
import math
import os
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar
import numpy as np
import requests
from PIL import Image, ImageFilter
def update_list(target: List, update: List) -> List:
if len(target) < len(update):
target.extend([None] * (len(update) - len(target)))
for i in range(len(target)):
if isinstance(update[i], collections.abc.Mapping):
target[i] = update_dict(target[i], update[i])
elif isinstance(update[i], List):
target[i] = update_list(target[i], update[i])
else:
target[i] = update[i]
return target
def update_dict(target: Dict, update: Dict) -> Dict:
for k, v in update.items():
if isinstance(v, collections.abc.Mapping):
target[k] = update_dict(target.get(k, {}), v)
elif isinstance(v, List):
target[k] = update_list(target[k], v)
else:
target[k] = v
return target
def snip_long_str(obj: Any) -> Any:
if isinstance(obj, collections.abc.Mapping):
return {snip_long_str(k): snip_long_str(v) for k, v in obj.items()}
elif isinstance(obj, str):
if len(obj) > 64:
return obj[:64]
elif hasattr(obj, '__iter__'):
return [snip_long_str(e) for e in obj]
return obj
def update_parameters(target: Dict, update: Dict):
return update_dict(target, update)
def b64encode_image(image: Image.Image) -> str:
output = io.BytesIO()
image.save(output, format='png')
return base64.b64encode(output.getvalue()).decode('utf-8')
def b64decode_image(b64_image: str) -> Image.Image:
return Image.open(io.BytesIO(base64.b64decode(b64_image)))
T = TypeVar('T')
def sliding_window(elements: List[T], window_size: int) -> Iterable[List[T]]:
elemet_length = len(elements)
if elemet_length < window_size:
return elements
if elemet_length == window_size:
yield elements
return
for i in range(elemet_length - window_size + 1):
yield elements[i:i+window_size]
@lru_cache(maxsize=16)
def load_image(image_path: str, size: Tuple[int, int]) -> Image.Image:
return Image.open(image_path).resize(size).convert('RGB')
def blend_images(images: Iterable[Image.Image], factors: Iterable[float]) -> Image.Image:
output: Optional[np.ndarray] = None
for image, factor in zip(images, factors):
if math.isclose(factor, 0.0):
continue
if math.isclose(factor, 1.0):
return image
if output is None:
output = factor * np.asarray(image)
else:
output += factor * np.asarray(image)
return Image.fromarray(output.astype(np.uint8))
DEFAULT_PARAMETERS = {
'init_images': [],
'denoising_strength': 0.7,
'prompt': "",
'negative_prompt': "",
'alwayson_scripts': {
'ControlNet': {
'args': [{
'input_image': None,
'mask': None,
'module': 'none',
'model': 'None',
'weight': 1,
'resize_mode': 'Scale to Fit (Inner Fit)',
'lowvram': False,
'processor_res': 64,
'threshold_a': 64,
'threshold_b': 64,
'guidance': 1,
'guidance_start': 0,
'guidance_end': 1,
'guessmode': False,
}],
}
},
'seed': 0,
'sampler_index': "DPM++ SDE Karras",
'batch_size': 1,
'n_iter': 1,
'steps': 15,
'cfg_scale': 7,
'width': 512,
'height': 512,
}
def main(
init_image_paths: List[str],
init_image_blend_window_factors: List[float],
controlnet_input_image_paths: List[str],
controlnet_input_image_blend_window_factors: List[float],
output_path: str,
output_image_brend_factors: List[float],
overwrite: bool = False,
mask_image_paths: Optional[List[str]] = None,
mask_image_blend_window_factors: Optional[List[float]] = None,
override_parameters: Optional[Dict[str, Any]] = None,
max_workers=1,
api_url='http://127.0.0.1:7860/sdapi/v1/img2img',
):
if len(init_image_paths) != len(controlnet_input_image_paths):
raise ValueError("Size mismatch: len(init_image_paths) != len(controlnet_input_image_paths)")
has_mask = mask_image_paths is not None
if has_mask and len(init_image_paths) != len(mask_image_paths):
raise ValueError("Size mismatch: len(init_image_paths) != len(mask_image_paths)")
if mask_image_blend_window_factors is None:
mask_image_blend_window_factors = [1]
if not os.path.exists(output_path):
os.mkdir(output_path)
if override_parameters is None:
override_parameters = {}
parameters = update_parameters(DEFAULT_PARAMETERS.copy(), override_parameters)
print(parameters)
image_size = (parameters['width'], parameters['height'])
batch_size = len(output_image_brend_factors)
output_image_brend_factors /= np.sum(output_image_brend_factors)
init_image_blend_window_factors /= np.sum(init_image_blend_window_factors)
init_image_blend_window_size = len(init_image_blend_window_factors)
init_image_blend_window_center_index = init_image_blend_window_size // 2
mask_image_blend_window_factors /= np.sum(mask_image_blend_window_factors)
mask_image_blend_window_size = len(mask_image_blend_window_factors)
controlnet_input_image_blend_window_factors /= np.sum(controlnet_input_image_blend_window_factors)
controlnet_input_image_blend_window_size = len(controlnet_input_image_blend_window_factors)
def load_and_blend_window(image_paths: List[str], blend_window_factors) -> Image.Image:
return blend_images((load_image(x, image_size) for x in image_paths), blend_window_factors)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
def process(init_image_window: List[str], controlnet_input_image_window: List[str], mask_image_window: List[str]):
init_image_filename, _extension = os.path.splitext(os.path.basename(init_image_window[init_image_blend_window_center_index]))
output_image_path = os.path.join(output_path, f"{init_image_filename}.png")
if not overwrite and os.path.exists(output_image_path):
print(f"skip: {init_image_filename}")
return
init_blend_image = load_and_blend_window(init_image_window, init_image_blend_window_factors)
mask_blend_image = load_and_blend_window(mask_image_window, mask_image_blend_window_factors) if has_mask else None
controlnet_input_blend_image = load_and_blend_window(controlnet_input_image_window, controlnet_input_image_blend_window_factors)
print(f"requset: {init_image_filename}")
update_parameters(parameters, {
'init_images': [b64encode_image(init_blend_image)],
'mask': b64encode_image(mask_blend_image) if has_mask else None,
'alwayson_scripts': {
'ControlNet': {
'args': [{
'input_image': b64encode_image(controlnet_input_blend_image),
}]
}
},
'batch_size': batch_size,
})
response = requests.post(api_url, data=json.dumps(parameters))
if not response.ok:
print(f"ERROR: {response.status_code}, {response.text}")
return
output_image = blend_images((b64decode_image(b64_image) for b64_image in response.json()['images']), output_image_brend_factors)
output_image.filter(ImageFilter.EDGE_ENHANCE).filter(ImageFilter.SMOOTH)
output_image.save(os.path.join(output_path, f"{init_image_filename}.png"))
if has_mask:
for init_image_window, controlnet_input_image_window, mask_image_window in zip(sliding_window(init_image_paths, init_image_blend_window_size), sliding_window(controlnet_input_image_paths, controlnet_input_image_blend_window_size), sliding_window(mask_image_paths, mask_image_blend_window_size)):
executor.submit(process, init_image_window, controlnet_input_image_window, mask_image_window)
else:
for init_image_window, controlnet_input_image_window in zip(sliding_window(init_image_paths, init_image_blend_window_size), sliding_window(controlnet_input_image_paths, controlnet_input_image_blend_window_size)):
executor.submit(process, init_image_window, controlnet_input_image_window, [])
if __name__ == '__main__':
main(
init_image_paths=sorted(glob.glob("render-color/*.png")),
controlnet_input_image_paths=sorted(glob.glob("render-line/*.png")),
init_image_blend_window_factors=[1],
controlnet_input_image_blend_window_factors=[1],
output_image_brend_factors=[1],
output_path="blend-output/",
# overwrite=True,
override_parameters={
'prompt': "(masterpiece, best quality:1.3), (beautiful 1girl:1.2), (kemono friends), 16yo, pink eyes, aqua very long hair , heart shaped pupils, white dress, happy, reflection floor, simple white background",
'negative_prompt': "EasyNegative, nsfw, (worst quality, low quality:1.4), letters, numbers, (lip, nose, tooth, rouge, lipstick, eyeshadow:1.4), (blush:1.2), (jpeg artifacts:1.4), (1boy, abs, muscular, rib:1.0), greyscale, monochrome, dusty sunbeams, trembling, motion lines, motion blur, emphasis lines, text, title, logo, signature",
'width': int(512*1),
'height': int(512*1),
'steps': 15,
'denoising_strength': 0.4,
'cfg_scale': 8,
'alwayson_scripts': {
'ControlNet': {
'args': [{
'processor_res': int(512*1),
'weight': 1.0,
'guidance_start': 0.0,
'guidance_end': 1.0,
'module': 'none',
'model': 'control_sd15_canny [fef5e48e]',
}]
}
},
},
)
@linelenil
Copy link

linelenil commented Jan 16, 2024

Woah, thanks so much for this code mate, it's really super helpfull!!
I had a question tho, how about adding a "reference" controlnet on top of that?
I'm super weak in term of API and I dont really understand how you feed in so well the input image sequence and the canny image sequence, so I have a hard time replicating the reference one

I tried something like that :

'ControlNet': {
                    'args': [
                    {
                        'processor_res': int(512*1),
                        'weight': 1.0,
                        'guidance_start': 0.0,
                        'guidance_end': 1.0,
                        'module': 'none',
                        'model': 'control_sd15_canny [fef5e48e]',
                    },
                        'processor_res': int(512*1),
                        'weight': 1.0,
                        'guidance_start': 0.0,
                        'guidance_end': 1.0,
                        'module': 'reference',
                        "image": {
                            "image": encode_file_to_base64(r"C:/path/to/reference_image.png"),
                            "mask": None
                        },
                    },
                    ]
                }

But obviously it doesnt work
Any change to have your help good sir? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment