Skip to content

Instantly share code, notes, and snippets.

@LiutongZhou
Last active November 8, 2024 15:10
Show Gist options
  • Save LiutongZhou/259bc7bca27595c495803ec001c31ee4 to your computer and use it in GitHub Desktop.
Save LiutongZhou/259bc7bca27595c495803ec001c31ee4 to your computer and use it in GitHub Desktop.
"""Distributed Data Parallel Inference for Hugging Face Transformers."""
from typing import Union
import torch
from accelerate import Accelerator
from accelerate.utils import gather_object
from tqdm import tqdm
from transformers import (
PreTrainedModel,
PreTrainedTokenizer,
PreTrainedTokenizerFast,
AutoModelForCausalLM,
AutoTokenizer,
)
def load_model_and_tokenizer(
pretrained_model_name_or_path: str,
accelerator: Accelerator,
*args,
tokenizer_name: str = None,
**kwargs,
) -> tuple[PreTrainedModel, Union[PreTrainedTokenizer, PreTrainedTokenizerFast]]:
"""Loads the model and tokenizer from pretrained_model_name_or_path"""
model = AutoModelForCausalLM.from_pretrained(
pretrained_model_name_or_path,
*args,
torch_dtype="auto",
low_cpu_mem_usage=True,
device_map={
"": accelerator.process_index
}, # put the entire model on the indexed GPU
**kwargs,
).eval()
tokenizer = AutoTokenizer.from_pretrained(
tokenizer_name or pretrained_model_name_or_path, *args, **kwargs
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
return model, tokenizer
def generate_batch_step(
batch: list[str],
model: PreTrainedModel,
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
num_generations_per_prompt: int = 1,
**generation_kwargs,
) -> list[list[str]]:
"""Generate completions for a batch of input prompts
Returns
-------
completions : list[list[str]]
A list of lists of generated completions for each prompt
shape (batch_size, num_generations_per_prompt)
"""
padding_side_default = tokenizer.padding_side
tokenizer.padding_side = "left"
inputs = tokenizer(
batch,
return_tensors="pt",
padding=True,
pad_to_multiple_of=8,
truncation=False,
).to(model.device)
generated_ids = []
with torch.inference_mode(), torch.backends.cuda.sdp_kernel():
for _ in range(num_generations_per_prompt):
outputs = model.generate(**inputs, **generation_kwargs)
generated_ids.extend(
output[len(prompt_ids) :]
for prompt_ids, output in zip(inputs["input_ids"], outputs)
)
completions_all: list[str] = tokenizer.batch_decode(
generated_ids, skip_special_tokens=True
)
# reshape to (batch_size, num_generations_per_prompt)
batch_size = len(batch)
tokenizer.padding_side = padding_side_default
return [
[completions_all[j * batch_size + i] for j in range(num_generations_per_prompt)]
for i in range(batch_size)
]
def generate_ddp(
prompts: list[str],
model: PreTrainedModel,
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
accelerator: Accelerator,
batch_size: int = 32,
num_generations_per_prompt: int = 1,
**generation_kwargs,
) -> list[list[str]]:
"""Generate completions for a list of prompts using distributed data parallel
Returns
-------
completions : list[list[str]]
A list of lists of generated completions for each prompt
shape (batch_size, num_generations_per_prompt)
"""
prompts_batch_list = [
prompts[i : i + batch_size] for i in range(0, len(prompts), batch_size)
]
completions_per_process = []
with accelerator.split_between_processes(
prompts_batch_list, apply_padding=True
) as batches_split:
for batch in tqdm(
batches_split, desc=f"Generating completions on device {accelerator.device}"
):
outputs: list[list[str]] = generate_batch_step(
batch,
model,
tokenizer,
num_generations_per_prompt,
**generation_kwargs,
)
completions_per_process.extend(outputs)
completions = gather_object(completions_per_process)
# Drop duplicates produced by apply_padding in split_between_processes
return completions[: len(prompts)]
def test():
accelerator = Accelerator()
model, tokenizer = load_model_and_tokenizer(
tokenizer_name="meta-llama/Llama-2-7b-hf",
pretrained_model_name_or_path="apple/OpenELM-270M-Instruct",
accelerator=accelerator,
trust_remote_code=True,
token="your_token",
)
outputs = generate_ddp(
["The president of USA is: ", "Good morning, "] * 16,
model,
tokenizer,
accelerator,
num_generations_per_prompt=2,
do_sample=True,
penalty_alpha=0.5,
max_new_tokens=10,
top_k=10,
num_beams=2,
)
assert len(outputs) == 64
assert len(outputs[0]) == 2
assert isinstance(outputs[0][0], str)
assert outputs[0][0] != outputs[1][0]
assert outputs[0][0] != outputs[0][1]
@LiutongZhou
Copy link
Author

LiutongZhou commented Nov 8, 2024

import into main script and run
accelerate launch --config_file {config_ddp.yaml} {main_script.py} --args
with config_ddp.yaml generated from https://huggingface.co/docs/accelerate/en/usage_guides/explore using multi-node multi-gpu config.

Pay attention to specify for each global device rank

+ machine_rank: 0
+ main_process_ip: 192.168.20.1
+ main_process_port: 8080
+ num_machines: 2    # num nodes
+ num_processes: 16   # num GPUs in total 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment