-
-
Save sayakpaul/19db9025e3d17acfceb329ae190c1eee to your computer and use it in GitHub Desktop.
""" | |
Code to demonstrate if async device transfers in `DiffusionPipeline` could be nice. | |
Some numbers: | |
A100 (80GB): | |
* Flux.1 Dev: 1.40x | |
* Flux.1 Dev ControlNet: 1.44x | |
* Stable Diffusion 3: 1.04x | |
* SDXL: 1.05x | |
Colab: | |
* SDXL: 1.22x (T4) | |
* SDXL ControlNet: 1.2x (L4) | |
Thanks to o1-mini for pairing 🍓! | |
""" | |
import torch | |
import torch.nn as nn | |
from importlib import import_module | |
import json | |
from huggingface_hub import hf_hub_download | |
from typing import List | |
from diffusers import ( | |
DiffusionPipeline, | |
ControlNetModel, | |
StableDiffusionXLControlNetPipeline, | |
FluxControlNetPipeline, | |
FluxControlNetModel, | |
) | |
import fire | |
CONTROLNET_MAPPING = { | |
ControlNetModel: StableDiffusionXLControlNetPipeline, | |
FluxControlNetModel: FluxControlNetPipeline, | |
} | |
class DiffusionPipelineAsync: | |
def __init__(self, model_id="black-forest-labs/FLUX.1-dev", controlnet_id=None): | |
if controlnet_id is None: | |
self.pipeline = DiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.bfloat16) | |
else: | |
config_path = hf_hub_download(controlnet_id, filename="config.json") | |
with open(config_path, "r") as f: | |
controlnet_cls_name = json.load(f)["_class_name"] | |
contolnet_cls = getattr(import_module("diffusers"), controlnet_cls_name) | |
controlnet = contolnet_cls.from_pretrained(controlnet_id, torch_dtype=torch.bfloat16) | |
self.pipeline = CONTROLNET_MAPPING[contolnet_cls].from_pretrained( | |
model_id, controlnet=controlnet, torch_dtype=torch.bfloat16 | |
) | |
self.models = [module for _, module in self.pipeline.components.items() if isinstance(module, torch.nn.Module)] | |
def _move_module_to_device(self, module: nn.Module, device: torch.device, stream: torch.cuda.Stream): | |
""" | |
Moves a single module to the specified device using the given CUDA stream. | |
""" | |
with torch.cuda.stream(stream): | |
module.to(device, non_blocking=True) | |
# Note: Synchronization is handled outside to allow overlap | |
def to_cuda_async(self, device: torch.device): | |
""" | |
Moves all child modules to the specified CUDA device using asynchronous transfers | |
and multiple CUDA streams to accelerate the process. | |
""" | |
if device.type != "cuda": | |
raise ValueError("to_cuda_async should be used with a CUDA device.") | |
# Ensure CUDA is available | |
if not torch.cuda.is_available(): | |
raise RuntimeError("CUDA is not available.") | |
# List of modules to move | |
modules: List[nn.Module] = self.models | |
# Create a CUDA stream for each module | |
streams = [torch.cuda.Stream(device=device) for _ in modules] | |
# Schedule each module transfer on its respective stream | |
for module, stream in zip(modules, streams): | |
self._move_module_to_device(module, device, stream) | |
# Synchronize all streams to ensure all transfers are complete | |
for stream in streams: | |
stream.synchronize() | |
return self | |
def to_sequential(self, device: torch.device): | |
""" | |
Moves all child modules to the specified device sequentially. | |
""" | |
for model in self.models: | |
model.to(device) | |
return self | |
def to_device(self, device: torch.device, use_fast_transfer: bool = False): | |
""" | |
Configurable method to move all child modules to the specified device. | |
Users can choose to use accelerated transfer with CUDA streams or standard sequential transfer. | |
Parameters: | |
- device (torch.device): The target device. | |
- use_fast_transfer (bool): If True and device is CUDA, use accelerated transfer. Otherwise, use sequential. | |
""" | |
if use_fast_transfer and device.type == "cuda": | |
return self.to_cuda_async(device) | |
else: | |
return self.to_sequential(device) | |
def time_transfer(pipeline: DiffusionPipeline, device: torch.device, use_fast_transfer: bool): | |
torch.cuda.synchronize() # Ensure all previous CUDA operations are complete | |
start_event = torch.cuda.Event(enable_timing=True) | |
end_event = torch.cuda.Event(enable_timing=True) | |
start_event.record() | |
pipeline.to_device(device, use_fast_transfer=use_fast_transfer) | |
end_event.record() | |
# Wait for the events to be recorded | |
torch.cuda.synchronize() | |
elapsed_time_ms = start_event.elapsed_time(end_event) # Time in milliseconds | |
return elapsed_time_ms / 1000 # Convert to seconds | |
def main(model_id: str = "black-forest-labs/FLUX.1-dev", controlnet_id: str = None): | |
# Check if CUDA is available | |
if not torch.cuda.is_available(): | |
print("CUDA is not available on this system. Exiting the demonstration.") | |
return | |
cuda_device = torch.device("cuda") | |
cpu_device = torch.device("cpu") | |
# Clear all CUDA cache. | |
torch.cuda.empty_cache() | |
# Initialize the pipeline | |
pipeline = DiffusionPipelineAsync(model_id=model_id, controlnet_id=controlnet_id) | |
# Warm-up transfers (optional but recommended for more consistent timing) | |
print("Performing warm-up transfers...") | |
pipeline.to_device(cuda_device, use_fast_transfer=True) | |
pipeline.to_device(cuda_device, use_fast_transfer=False) | |
# Ensure models are initially on CPU | |
print("Ensuring models are on CPU...") | |
pipeline.to_sequential(cpu_device) | |
# Measure time for accelerated transfer | |
print("\nStarting accelerated CUDA transfer...") | |
accelerated_time = time_transfer(pipeline, cuda_device, use_fast_transfer=True) | |
print(f"Accelerated CUDA transfer completed in {accelerated_time:.4f} seconds.") | |
# Verify that models are on CUDA | |
print("Verifying that all models are on CUDA after accelerated transfer...") | |
for module in pipeline.models: | |
for param in module.parameters(): | |
assert param.is_cuda, f"{module.__class__.__name__} is not on CUDA." | |
print("All models are successfully on CUDA after accelerated transfer.") | |
# Move models back to CPU for fair comparison | |
print("\nMoving models back to CPU for standard transfer...") | |
pipeline.to_sequential(cpu_device) | |
# Measure time for standard sequential transfer | |
print("Starting standard sequential CUDA transfer...") | |
sequential_time = time_transfer(pipeline, cuda_device, use_fast_transfer=False) | |
print(f"Standard sequential CUDA transfer completed in {sequential_time:.4f} seconds.") | |
# Verify that models are on CUDA | |
print("Verifying that all models are on CUDA after standard transfer...") | |
for module in pipeline.models: | |
for param in module.parameters(): | |
assert param.is_cuda, f"{module.__class__.__name__} is not on CUDA." | |
print("All models are successfully on CUDA after accelerated transfer.") | |
# Compare the two methods | |
print("\nTransfer Time Comparison:") | |
print(f"Accelerated CUDA Transfer Time: {accelerated_time:.4f} seconds") | |
print(f"Standard Sequential Transfer Time: {sequential_time:.4f} seconds") | |
if accelerated_time > 0: | |
speedup = sequential_time / accelerated_time | |
print(f"Speedup for ({model_id}): {speedup:.2f}x faster using accelerated transfer.") | |
else: | |
print(f"Accelerated transfer time is too small to calculate speedup for {model_id}.") | |
if __name__ == "__main__": | |
fire.Fire(main) |
Thanks so much for your feedback, I appreciate it!
From what I see using pinned memory without any streams could potentially be more beneficial in this case? I will do some more benchmarking regarding this.
Now I see a potential issue with the timing. The Event.record() call might not be capturing the data transfer in your custom data transfer stream. By default it uses torch.cuda.current_stream(). Perhaps you can double check this?
Could you expand a bit more on this? What would be the right way to use record in this setup or more generally time this?
See https://pytorch.org/docs/stable/generated/torch.cuda.Event.html#torch.cuda.Event.record
I personally don't use torch.cuda.Event
for timing, so I'm not sure about its interactions with CUDA stream and other stuff. Won't simply using time.perf_counter()
suffice in this case (with appropriate torch.cuda.synchronize()
calls)? Since precise measurement is not that important, and it may also make sense to time it from the host/CPU's perspective.
Thanks.
Since precise measurement is not that important, and it may also make sense to time it from the host/CPU's perspective.
I think time_transfer()
does it from the perspective of how pipeline.to()
would be called by the user. So, I think the positioning of the timing code is okay? I will do it with time.perf_counter()
after making the code work with pinned memory.
The current position of timing code is fine, but the (potential) problem is with CUDA event timing. I'm not sure if it is timing what you expect: you are doing data transfer in your own custom CUDA stream, but the CUDA event is recording in the default stream, which doesn't do anything.
You're right. I will change it.
CUDA stream is only useful when you want to overlap data transfer with compute (and possibly do 2 different computes at the same time). CPU->CUDA transfer can only be done for 1 data at a time (regardless of non_blocking, pinned_memory or whatnot), so there is no point in using CUDA stream if you don't overlap data transfer and compute.
Hey @gau-nernst! Sorry for the ping here but I wanted to know more about what you said here. In your CPU offload gist, we see benefits of CUDA streams because there's a data transfer happening for the next module/layer while the current layer is running a computation, IIUC?
In your CPU offload gist, we see benefits of CUDA streams because there's a data transfer happening for the next module/layer while the current layer is running a computation, IIUC?
Yes, that is correct
@gau-nernst apologies for the delay but I managed to get some numbers from my experiments on using pin_memory()
.
First experiment I did was to simply do a pin_memory()
on the params of the models of FluxPipeline
before calling .to("cuda")
on pipe
.
Code
from diffusers import DiffusionPipeline
from torch.utils.benchmark import Timer
import gc
import torch
import matplotlib.pyplot as plt
def timer(cmd):
median = (
Timer(cmd, globals=globals())
.adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
.median
* 1000
)
print(f"{cmd}: {median: 4.4f} ms")
return median
def load_pipeline(pin_memory, enable_model_cpu_offload=False):
pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)
if pin_memory:
for name, component in pipeline.components.items():
if isinstance(component, torch.nn.Module):
for p in component.parameters():
p.data = p.data.cpu().pin_memory()
if enable_model_cpu_offload:
pipeline.enable_model_cpu_offload()
else:
pipeline = pipeline.to("cuda")
return pipeline
def main():
# Runtimes:
pageable_to_device = timer("load_pipeline(pin_memory=False)")
pin_mem_to_device = timer("load_pipeline(pin_memory=True)")
# Ratios:
r1 = pin_mem_to_device / pageable_to_device
# Create a figure with the results
fig, ax = plt.subplots()
xlabels = [0, 1]
bar_labels = [
"pageable_tensor.to(device) (1x)",
f"pageable_tensor.pin_memory().to(device) ({r1:4.2f}x)"
]
values = [pageable_to_device, pin_mem_to_device]
colors = ["tab:blue", "tab:red"]
ax.bar(xlabels, values, label=bar_labels, color=colors)
ax.set_ylabel("Runtime (ms)")
ax.set_title("Device casting runtime (pin-memory)")
ax.set_xticks([])
ax.legend()
# plt.show()
plt.savefig("pin_memory_comparison.png", bbox_inches="tight", dpi=200)
# Clear stuff
_ = gc.collect()
if __name__ == "__main__":
main()
And I have these numbers on a 80GB A100:
load_pipeline(pin_memory=False): 6990.8780 ms
load_pipeline(pin_memory=True): 9491.9515 ms
And then I did the same but this time for enable_model_cpu_offload()
. enable_model_cpu_offload()
basically loads an entire model only when it's needed to perform computation and rest of the models remain in CPU.
Code
from diffusers import DiffusionPipeline
from torch.utils.benchmark import Timer
import gc
import torch
import matplotlib.pyplot as plt
def timer(cmd):
median = (
Timer(cmd, globals=globals())
.adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
.median
* 1000
)
print(f"{cmd}: {median: 4.4f} ms")
return median
def load_and_run_pipeline(pin_memory, enable_model_cpu_offload=False):
pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)
if pin_memory:
for name, component in pipeline.components.items():
if isinstance(component, torch.nn.Module):
for p in component.parameters():
p.data = p.data.cpu().pin_memory()
for name, component in pipeline.components.items():
if isinstance(component, torch.nn.Module):
for p in component.parameters():
assert p.data.is_pinned()
if enable_model_cpu_offload:
pipeline.enable_model_cpu_offload()
else:
pipeline = pipeline.to("cuda")
prompt = "A cat holding a sign that says hello world"
image = pipeline(
prompt,
height=1024,
width=1024,
guidance_scale=3.5,
num_inference_steps=50,
max_sequence_length=512,
generator=torch.Generator("cpu").manual_seed(0)
).images[0]
def main():
# Runtimes:
pageable_to_device = timer("load_and_run_pipeline(pin_memory=False, enable_model_cpu_offload=True)")
pin_mem_to_device = timer("load_and_run_pipeline(pin_memory=True, enable_model_cpu_offload=True)")
# Ratios:
r1 = pin_mem_to_device / pageable_to_device
# Create a figure with the results
fig, ax = plt.subplots()
xlabels = [0, 1]
bar_labels = [
"pageable_tensor.to(device) (1x)",
f"pageable_tensor.pin_memory().to(device) ({r1:4.2f}x)"
]
values = [pageable_to_device, pin_mem_to_device]
colors = ["tab:blue", "tab:red"]
ax.bar(xlabels, values, label=bar_labels, color=colors)
ax.set_ylabel("Runtime (ms)")
ax.set_title("Device casting runtime (pin-memory)")
ax.set_xticks([])
ax.legend()
# plt.show()
plt.savefig("mco_comparison.png", bbox_inches="tight", dpi=200)
# Clear stuff
_ = gc.collect()
if __name__ == "__main__":
main()
Numbers from a 24GB 4090Ti:
load_and_run_pipeline(pin_memory=False, enable_model_cpu_offload=True): 76443.4974 ms
load_and_run_pipeline(pin_memory=True, enable_model_cpu_offload=True): 85482.2263 ms
Are these expected results? Is the slowdown when using pin_memory()
because we’re actually asking Python to execute an operation that CUDA will perform anyway before copying the data from host to device?
(4090 because that is when enable_model_cpu_offload()
's use is the most relevant IMO)
My references for the timing code come from https://pytorch.org/tutorials/intermediate/pinmem_nonblock.html.
- I think you should exclude
.pin_memory()
in the timing. Otherwise it will measure the extra time for memory copy from pageable->page-locked CPU memory, which is a fixed cost. - Does
pipeline.enable_model_cpu_offload()
andpipeline.to("cuda")
callnon_blocking=True
internally? - In my original snippet, I use per-layer offload. Thus, CPU->GPU transfer for the next layer can start while the GPU is busy computing for current layer. This is specifically for GPUs that cannot fit the FLUX transformer (MM-DiT 23.8 GB). For your case, since you use per-model offload, and your GPU can fit the MM-DiT model, overlapping compute with data transfer won't help much: inference time is dominated by the repeated denoising steps, and the full MM-DiT model is already in memory.
I think you should exclude .pin_memory() in the timing. Otherwise it will measure the extra time for memory copy from pageable->page-locked CPU memory, which is a fixed cost.
Makes sense. I took it out.
Does pipeline.enable_model_cpu_offload() and pipeline.to("cuda") call non_blocking=True internally?
We don't. However, I experimented with non_blocking=True
while doing the direct to("cuda"
placement and here are the results:
pipeline.to('cuda'): 22.5006 ms
pinned_pipeline.to('cuda', non_blocking=False): 22.7757 ms
pinned_pipeline_non_blocking.to('cuda', non_blocking=True): 22.3797 ms
In my original snippet, I use per-layer offload. Thus, CPU->GPU transfer for the next layer can start while the GPU is busy computing for current layer. This is specifically for GPUs that cannot fit the FLUX transformer (MM-DiT 23.8 GB). For your case, since you use per-model offload, and your GPU can fit the MM-DiT model, overlapping compute with data transfer won't help much: inference time is dominated by the repeated denoising steps, and the full MM-DiT model is already in memory.
Yes, this makes sense. However, my reasoning was that if the model is memory-pinned, there would still be an advantage in terms of transfer when casting it to the GPU but I am likely wrong here.
My updated code:
Code
from diffusers import DiffusionPipeline
from torch.utils.benchmark import Timer
import gc
import torch
import matplotlib.pyplot as plt
def timer(cmd, local_dict):
median = (
Timer(cmd, globals=local_dict)
.adaptive_autorange(min_run_time=1.0, max_run_time=20.0)
.median
* 1000
)
print(f"{cmd}: {median: 4.4f} ms")
return median
def main():
pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)
# Runtimes:
pageable_to_device = timer("pipeline.to('cuda')", locals())
del pipeline
torch.cuda.empty_cache()
gc.collect()
# pinning memory is a fixed cost.
pinned_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16)
for name, component in pinned_pipeline.components.items():
if isinstance(component, torch.nn.Module):
for p in component.parameters():
p.data = p.data.cpu().pin_memory()
pin_mem_to_device = timer("pinned_pipeline.to('cuda', non_blocking=False)", locals())
del pinned_pipeline
torch.cuda.empty_cache()
gc.collect()
# pinning memory is a fixed cost.
pinned_pipeline_non_blocking = DiffusionPipeline.from_pretrained(
"black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16
)
for name, component in pinned_pipeline_non_blocking.components.items():
if isinstance(component, torch.nn.Module):
for p in component.parameters():
p.data = p.data.cpu().pin_memory()
pin_mem_to_device_non_blocking = timer("pinned_pipeline_non_blocking.to('cuda', non_blocking=True)", locals())
# Clear stuff
_ = gc.collect()
if __name__ == "__main__":
main()
Hi @sayakpaul! Commenting here so we can see it directly. Some observations after a brief look
.to(cuda, non_blocking=True)
only works when the CPU model is in pinned memory. See https://pytorch.org/docs/stable/generated/torch.Tensor.cuda.html. So the speedup for some models is a bit strange. Maybe there is some problem with the timing (or I misunderstood how non_blocking memory transfer works 🤔). Another good article https://pytorch.org/tutorials/intermediate/pinmem_nonblock.html (and also https://docs.nvidia.com/cuda/cuda-runtime-api/api-sync-behavior.html). So perhaps the speedup is simply due to fewer explicit CUDA synchronization calls.Event.record()
call might not be capturing the data transfer in your custom data transfer stream. By default it usestorch.cuda.current_stream()
. Perhaps you can double check this?Reiterate some generic pointers about CUDA data transfer and CUDA stream
.cuda()
, whether blocking or not, you will need to copy data to CPU memory first anyway (though I saw that you did some warmup, so after warmup, the weights should be residing in CPU memory now, though not pinned).In my original gist, I didn't show my "pinned_memory only" implementation, but that implementation didn't use CUDA stream. It simply uses pinned memory to speedup data transfer.
Also, I'm no expert in CUDA and this kind of stuff. So double-check everything I say 😄