Skip to content

Instantly share code, notes, and snippets.

@sayakpaul
Last active December 9, 2024 07:35
Show Gist options
  • Save sayakpaul/19db9025e3d17acfceb329ae190c1eee to your computer and use it in GitHub Desktop.
Save sayakpaul/19db9025e3d17acfceb329ae190c1eee to your computer and use it in GitHub Desktop.
"""
Code to demonstrate if async device transfers in `DiffusionPipeline` could be nice.
Some numbers:
A100 (80GB):
* Flux.1 Dev: 1.40x
* Flux.1 Dev ControlNet: 1.44x
* Stable Diffusion 3: 1.04x
* SDXL: 1.05x
Colab:
* SDXL: 1.22x (T4)
* SDXL ControlNet: 1.2x (L4)
Thanks to o1-mini for pairing 🍓!
"""
import torch
import torch.nn as nn
from importlib import import_module
import json
from huggingface_hub import hf_hub_download
from typing import List
from diffusers import (
DiffusionPipeline,
ControlNetModel,
StableDiffusionXLControlNetPipeline,
FluxControlNetPipeline,
FluxControlNetModel,
)
import fire
CONTROLNET_MAPPING = {
ControlNetModel: StableDiffusionXLControlNetPipeline,
FluxControlNetModel: FluxControlNetPipeline,
}
class DiffusionPipelineAsync:
def __init__(self, model_id="black-forest-labs/FLUX.1-dev", controlnet_id=None):
if controlnet_id is None:
self.pipeline = DiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.bfloat16)
else:
config_path = hf_hub_download(controlnet_id, filename="config.json")
with open(config_path, "r") as f:
controlnet_cls_name = json.load(f)["_class_name"]
contolnet_cls = getattr(import_module("diffusers"), controlnet_cls_name)
controlnet = contolnet_cls.from_pretrained(controlnet_id, torch_dtype=torch.bfloat16)
self.pipeline = CONTROLNET_MAPPING[contolnet_cls].from_pretrained(
model_id, controlnet=controlnet, torch_dtype=torch.bfloat16
)
self.models = [module for _, module in self.pipeline.components.items() if isinstance(module, torch.nn.Module)]
def _move_module_to_device(self, module: nn.Module, device: torch.device, stream: torch.cuda.Stream):
"""
Moves a single module to the specified device using the given CUDA stream.
"""
with torch.cuda.stream(stream):
module.to(device, non_blocking=True)
# Note: Synchronization is handled outside to allow overlap
def to_cuda_async(self, device: torch.device):
"""
Moves all child modules to the specified CUDA device using asynchronous transfers
and multiple CUDA streams to accelerate the process.
"""
if device.type != "cuda":
raise ValueError("to_cuda_async should be used with a CUDA device.")
# Ensure CUDA is available
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available.")
# List of modules to move
modules: List[nn.Module] = self.models
# Create a CUDA stream for each module
streams = [torch.cuda.Stream(device=device) for _ in modules]
# Schedule each module transfer on its respective stream
for module, stream in zip(modules, streams):
self._move_module_to_device(module, device, stream)
# Synchronize all streams to ensure all transfers are complete
for stream in streams:
stream.synchronize()
return self
def to_sequential(self, device: torch.device):
"""
Moves all child modules to the specified device sequentially.
"""
for model in self.models:
model.to(device)
return self
def to_device(self, device: torch.device, use_fast_transfer: bool = False):
"""
Configurable method to move all child modules to the specified device.
Users can choose to use accelerated transfer with CUDA streams or standard sequential transfer.
Parameters:
- device (torch.device): The target device.
- use_fast_transfer (bool): If True and device is CUDA, use accelerated transfer. Otherwise, use sequential.
"""
if use_fast_transfer and device.type == "cuda":
return self.to_cuda_async(device)
else:
return self.to_sequential(device)
def time_transfer(pipeline: DiffusionPipeline, device: torch.device, use_fast_transfer: bool):
torch.cuda.synchronize() # Ensure all previous CUDA operations are complete
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
pipeline.to_device(device, use_fast_transfer=use_fast_transfer)
end_event.record()
# Wait for the events to be recorded
torch.cuda.synchronize()
elapsed_time_ms = start_event.elapsed_time(end_event) # Time in milliseconds
return elapsed_time_ms / 1000 # Convert to seconds
def main(model_id: str = "black-forest-labs/FLUX.1-dev", controlnet_id: str = None):
# Check if CUDA is available
if not torch.cuda.is_available():
print("CUDA is not available on this system. Exiting the demonstration.")
return
cuda_device = torch.device("cuda")
cpu_device = torch.device("cpu")
# Clear all CUDA cache.
torch.cuda.empty_cache()
# Initialize the pipeline
pipeline = DiffusionPipelineAsync(model_id=model_id, controlnet_id=controlnet_id)
# Warm-up transfers (optional but recommended for more consistent timing)
print("Performing warm-up transfers...")
pipeline.to_device(cuda_device, use_fast_transfer=True)
pipeline.to_device(cuda_device, use_fast_transfer=False)
# Ensure models are initially on CPU
print("Ensuring models are on CPU...")
pipeline.to_sequential(cpu_device)
# Measure time for accelerated transfer
print("\nStarting accelerated CUDA transfer...")
accelerated_time = time_transfer(pipeline, cuda_device, use_fast_transfer=True)
print(f"Accelerated CUDA transfer completed in {accelerated_time:.4f} seconds.")
# Verify that models are on CUDA
print("Verifying that all models are on CUDA after accelerated transfer...")
for module in pipeline.models:
for param in module.parameters():
assert param.is_cuda, f"{module.__class__.__name__} is not on CUDA."
print("All models are successfully on CUDA after accelerated transfer.")
# Move models back to CPU for fair comparison
print("\nMoving models back to CPU for standard transfer...")
pipeline.to_sequential(cpu_device)
# Measure time for standard sequential transfer
print("Starting standard sequential CUDA transfer...")
sequential_time = time_transfer(pipeline, cuda_device, use_fast_transfer=False)
print(f"Standard sequential CUDA transfer completed in {sequential_time:.4f} seconds.")
# Verify that models are on CUDA
print("Verifying that all models are on CUDA after standard transfer...")
for module in pipeline.models:
for param in module.parameters():
assert param.is_cuda, f"{module.__class__.__name__} is not on CUDA."
print("All models are successfully on CUDA after accelerated transfer.")
# Compare the two methods
print("\nTransfer Time Comparison:")
print(f"Accelerated CUDA Transfer Time: {accelerated_time:.4f} seconds")
print(f"Standard Sequential Transfer Time: {sequential_time:.4f} seconds")
if accelerated_time > 0:
speedup = sequential_time / accelerated_time
print(f"Speedup for ({model_id}): {speedup:.2f}x faster using accelerated transfer.")
else:
print(f"Accelerated transfer time is too small to calculate speedup for {model_id}.")
if __name__ == "__main__":
fire.Fire(main)
@gau-nernst
Copy link

gau-nernst commented Nov 21, 2024

Hi @sayakpaul! Commenting here so we can see it directly. Some observations after a brief look

  1. .to(cuda, non_blocking=True) only works when the CPU model is in pinned memory. See https://pytorch.org/docs/stable/generated/torch.Tensor.cuda.html. So the speedup for some models is a bit strange. Maybe there is some problem with the timing (or I misunderstood how non_blocking memory transfer works 🤔). Another good article https://pytorch.org/tutorials/intermediate/pinmem_nonblock.html (and also https://docs.nvidia.com/cuda/cuda-runtime-api/api-sync-behavior.html). So perhaps the speedup is simply due to fewer explicit CUDA synchronization calls.
  2. I think you don't need 1 CUDA stream for each model. Data transfer is serialized anyway. Just 1 CUDA stream is enough. And actually, if you don't overlap compute with data transfer, I think using a separate CUDA stream in this case doesn't do anything?
  3. Now I see a potential issue with the timing. The Event.record() call might not be capturing the data transfer in your custom data transfer stream. By default it uses torch.cuda.current_stream(). Perhaps you can double check this?

Reiterate some generic pointers about CUDA data transfer and CUDA stream

  • CPU->CUDA is fast (and non-blocking with respect to host) only if data resides in CPU pinned memory. For diffusers, I think when you initialize it, the weights are memory-mapped from .safetensors files. So when doing .cuda(), whether blocking or not, you will need to copy data to CPU memory first anyway (though I saw that you did some warmup, so after warmup, the weights should be residing in CPU memory now, though not pinned).
  • CUDA stream is only useful when you want to overlap data transfer with compute (and possibly do 2 different computes at the same time). CPU->CUDA transfer can only be done for 1 data at a time (regardless of non_blocking, pinned_memory or whatnot), so there is no point in using CUDA stream if you don't overlap data transfer and compute.

In my original gist, I didn't show my "pinned_memory only" implementation, but that implementation didn't use CUDA stream. It simply uses pinned memory to speedup data transfer.

Also, I'm no expert in CUDA and this kind of stuff. So double-check everything I say 😄

@sayakpaul
Copy link
Author

Thanks so much for your feedback, I appreciate it!

From what I see using pinned memory without any streams could potentially be more beneficial in this case? I will do some more benchmarking regarding this.

Now I see a potential issue with the timing. The Event.record() call might not be capturing the data transfer in your custom data transfer stream. By default it uses torch.cuda.current_stream(). Perhaps you can double check this?

Could you expand a bit more on this? What would be the right way to use record in this setup or more generally time this?

@gau-nernst
Copy link

See https://pytorch.org/docs/stable/generated/torch.cuda.Event.html#torch.cuda.Event.record

I personally don't use torch.cuda.Event for timing, so I'm not sure about its interactions with CUDA stream and other stuff. Won't simply using time.perf_counter() suffice in this case (with appropriate torch.cuda.synchronize() calls)? Since precise measurement is not that important, and it may also make sense to time it from the host/CPU's perspective.

@sayakpaul
Copy link
Author

Thanks.

Since precise measurement is not that important, and it may also make sense to time it from the host/CPU's perspective.

I think time_transfer() does it from the perspective of how pipeline.to() would be called by the user. So, I think the positioning of the timing code is okay? I will do it with time.perf_counter() after making the code work with pinned memory.

@gau-nernst
Copy link

The current position of timing code is fine, but the (potential) problem is with CUDA event timing. I'm not sure if it is timing what you expect: you are doing data transfer in your own custom CUDA stream, but the CUDA event is recording in the default stream, which doesn't do anything.

@sayakpaul
Copy link
Author

You're right. I will change it.

@sayakpaul
Copy link
Author

CUDA stream is only useful when you want to overlap data transfer with compute (and possibly do 2 different computes at the same time). CPU->CUDA transfer can only be done for 1 data at a time (regardless of non_blocking, pinned_memory or whatnot), so there is no point in using CUDA stream if you don't overlap data transfer and compute.

Hey @gau-nernst! Sorry for the ping here but I wanted to know more about what you said here. In your CPU offload gist, we see benefits of CUDA streams because there's a data transfer happening for the next module/layer while the current layer is running a computation, IIUC?

@gau-nernst
Copy link

@sayakpaul

In your CPU offload gist, we see benefits of CUDA streams because there's a data transfer happening for the next module/layer while the current layer is running a computation, IIUC?

Yes, that is correct

@sayakpaul
Copy link
Author

@gau-nernst apologies for the delay but I managed to get some numbers from my experiments on using pin_memory().

First experiment I did was to simply do a pin_memory() on the params of the models of FluxPipeline before calling .to("cuda") on pipe.

Code
from diffusers import DiffusionPipeline 
from torch.utils.benchmark import Timer
import gc
import torch 
import matplotlib.pyplot as plt

def timer(cmd):
    median = (
        Timer(cmd, globals=globals())
        .adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
        .median
        * 1000
    )
    print(f"{cmd}: {median: 4.4f} ms")
    return median


def load_pipeline(pin_memory, enable_model_cpu_offload=False):
    pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)

    if pin_memory:
        for name, component in pipeline.components.items():
            if isinstance(component, torch.nn.Module):
                for p in component.parameters():
                    p.data = p.data.cpu().pin_memory()
    if enable_model_cpu_offload:
        pipeline.enable_model_cpu_offload()
    else:
        pipeline = pipeline.to("cuda")
    
    return pipeline


def main():
    # Runtimes:
    pageable_to_device = timer("load_pipeline(pin_memory=False)")
    pin_mem_to_device = timer("load_pipeline(pin_memory=True)")
    
    # Ratios:
    r1 = pin_mem_to_device / pageable_to_device

    # Create a figure with the results
    fig, ax = plt.subplots()

    xlabels = [0, 1]
    bar_labels = [
        "pageable_tensor.to(device) (1x)",
        f"pageable_tensor.pin_memory().to(device) ({r1:4.2f}x)"
    ]
    values = [pageable_to_device, pin_mem_to_device]
    colors = ["tab:blue", "tab:red"]
    ax.bar(xlabels, values, label=bar_labels, color=colors)

    ax.set_ylabel("Runtime (ms)")
    ax.set_title("Device casting runtime (pin-memory)")
    ax.set_xticks([])
    ax.legend()

    # plt.show()
    plt.savefig("pin_memory_comparison.png", bbox_inches="tight", dpi=200)

    # Clear stuff
    _ = gc.collect()

if __name__ == "__main__":
    main()

And I have these numbers on a 80GB A100:

load_pipeline(pin_memory=False):  6990.8780 ms
load_pipeline(pin_memory=True):  9491.9515 ms

And then I did the same but this time for enable_model_cpu_offload(). enable_model_cpu_offload() basically loads an entire model only when it's needed to perform computation and rest of the models remain in CPU.

Code
from diffusers import DiffusionPipeline 
from torch.utils.benchmark import Timer
import gc
import torch 
import matplotlib.pyplot as plt

def timer(cmd):
    median = (
        Timer(cmd, globals=globals())
        .adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
        .median
        * 1000
    )
    print(f"{cmd}: {median: 4.4f} ms")
    return median


def load_and_run_pipeline(pin_memory, enable_model_cpu_offload=False):
    pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)

    if pin_memory:
        for name, component in pipeline.components.items():
            if isinstance(component, torch.nn.Module):
                for p in component.parameters():
                    p.data = p.data.cpu().pin_memory()
    
        for name, component in pipeline.components.items():
            if isinstance(component, torch.nn.Module):
                for p in component.parameters(): 
                    assert p.data.is_pinned()
    
    if enable_model_cpu_offload:
        pipeline.enable_model_cpu_offload()
    else:
        pipeline = pipeline.to("cuda")
    
    prompt = "A cat holding a sign that says hello world"
    image = pipeline(
        prompt,
        height=1024,
        width=1024,
        guidance_scale=3.5,
        num_inference_steps=50,
        max_sequence_length=512,
        generator=torch.Generator("cpu").manual_seed(0)
    ).images[0]


def main():
    # Runtimes:
    pageable_to_device = timer("load_and_run_pipeline(pin_memory=False, enable_model_cpu_offload=True)")
    pin_mem_to_device = timer("load_and_run_pipeline(pin_memory=True, enable_model_cpu_offload=True)")
    
    # Ratios:
    r1 = pin_mem_to_device / pageable_to_device

    # Create a figure with the results
    fig, ax = plt.subplots()

    xlabels = [0, 1]
    bar_labels = [
        "pageable_tensor.to(device) (1x)",
        f"pageable_tensor.pin_memory().to(device) ({r1:4.2f}x)"
    ]
    values = [pageable_to_device, pin_mem_to_device]
    colors = ["tab:blue", "tab:red"]
    ax.bar(xlabels, values, label=bar_labels, color=colors)

    ax.set_ylabel("Runtime (ms)")
    ax.set_title("Device casting runtime (pin-memory)")
    ax.set_xticks([])
    ax.legend()

    # plt.show()
    plt.savefig("mco_comparison.png", bbox_inches="tight", dpi=200)

    # Clear stuff
    _ = gc.collect()

if __name__ == "__main__":
    main()

Numbers from a 24GB 4090Ti:

load_and_run_pipeline(pin_memory=False, enable_model_cpu_offload=True):  76443.4974 ms
load_and_run_pipeline(pin_memory=True, enable_model_cpu_offload=True):  85482.2263 ms

Are these expected results? Is the slowdown when using pin_memory() because we’re actually asking Python to execute an operation that CUDA will perform anyway before copying the data from host to device?

(4090 because that is when enable_model_cpu_offload()'s use is the most relevant IMO)

My references for the timing code come from https://pytorch.org/tutorials/intermediate/pinmem_nonblock.html.

@gau-nernst
Copy link

gau-nernst commented Dec 9, 2024

  1. I think you should exclude .pin_memory() in the timing. Otherwise it will measure the extra time for memory copy from pageable->page-locked CPU memory, which is a fixed cost.
  2. Does pipeline.enable_model_cpu_offload() and pipeline.to("cuda") call non_blocking=True internally?
  3. In my original snippet, I use per-layer offload. Thus, CPU->GPU transfer for the next layer can start while the GPU is busy computing for current layer. This is specifically for GPUs that cannot fit the FLUX transformer (MM-DiT 23.8 GB). For your case, since you use per-model offload, and your GPU can fit the MM-DiT model, overlapping compute with data transfer won't help much: inference time is dominated by the repeated denoising steps, and the full MM-DiT model is already in memory.

@sayakpaul
Copy link
Author

I think you should exclude .pin_memory() in the timing. Otherwise it will measure the extra time for memory copy from pageable->page-locked CPU memory, which is a fixed cost.

Makes sense. I took it out.

Does pipeline.enable_model_cpu_offload() and pipeline.to("cuda") call non_blocking=True internally?

We don't. However, I experimented with non_blocking=True while doing the direct to("cuda" placement and here are the results:

pipeline.to('cuda'):  22.5006 ms
pinned_pipeline.to('cuda', non_blocking=False):  22.7757 ms
pinned_pipeline_non_blocking.to('cuda', non_blocking=True):  22.3797 ms

In my original snippet, I use per-layer offload. Thus, CPU->GPU transfer for the next layer can start while the GPU is busy computing for current layer. This is specifically for GPUs that cannot fit the FLUX transformer (MM-DiT 23.8 GB). For your case, since you use per-model offload, and your GPU can fit the MM-DiT model, overlapping compute with data transfer won't help much: inference time is dominated by the repeated denoising steps, and the full MM-DiT model is already in memory.

Yes, this makes sense. However, my reasoning was that if the model is memory-pinned, there would still be an advantage in terms of transfer when casting it to the GPU but I am likely wrong here.

My updated code:

Code
from diffusers import DiffusionPipeline 
from torch.utils.benchmark import Timer
import gc
import torch 
import matplotlib.pyplot as plt

def timer(cmd, local_dict):
    median = (
        Timer(cmd, globals=local_dict)
        .adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
        .median
        * 1000
    )
    print(f"{cmd}: {median: 4.4f} ms")
    return median



def main():
    pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)

    # Runtimes:
    pageable_to_device = timer("pipeline.to('cuda')", locals())
    del pipeline 
    torch.cuda.empty_cache()
    gc.collect()

    # pinning memory is a fixed cost.
    pinned_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)
    for name, component in pinned_pipeline.components.items():
        if isinstance(component, torch.nn.Module):
            for p in component.parameters():
                p.data = p.data.cpu().pin_memory()
    pin_mem_to_device = timer("pinned_pipeline.to('cuda', non_blocking=False)", locals())
    del pinned_pipeline 
    torch.cuda.empty_cache()
    gc.collect()

    # pinning memory is a fixed cost.
    pinned_pipeline_non_blocking = DiffusionPipeline.from_pretrained(
        "black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16
    )
    for name, component in pinned_pipeline_non_blocking.components.items():
        if isinstance(component, torch.nn.Module):
            for p in component.parameters():
                p.data = p.data.cpu().pin_memory()
    pin_mem_to_device_non_blocking = timer("pinned_pipeline_non_blocking.to('cuda', non_blocking=True)", locals())

    # Clear stuff
    _ = gc.collect()

if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment