Last active
November 8, 2024 15:10
-
-
Save LiutongZhou/259bc7bca27595c495803ec001c31ee4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Distributed Data Parallel Inference for Hugging Face Transformers.""" | |
from typing import Union | |
import torch | |
from accelerate import Accelerator | |
from accelerate.utils import gather_object | |
from tqdm import tqdm | |
from transformers import ( | |
PreTrainedModel, | |
PreTrainedTokenizer, | |
PreTrainedTokenizerFast, | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
) | |
def load_model_and_tokenizer( | |
pretrained_model_name_or_path: str, | |
accelerator: Accelerator, | |
*args, | |
tokenizer_name: str = None, | |
**kwargs, | |
) -> tuple[PreTrainedModel, Union[PreTrainedTokenizer, PreTrainedTokenizerFast]]: | |
"""Loads the model and tokenizer from pretrained_model_name_or_path""" | |
model = AutoModelForCausalLM.from_pretrained( | |
pretrained_model_name_or_path, | |
*args, | |
torch_dtype="auto", | |
low_cpu_mem_usage=True, | |
device_map={ | |
"": accelerator.process_index | |
}, # put the entire model on the indexed GPU | |
**kwargs, | |
).eval() | |
tokenizer = AutoTokenizer.from_pretrained( | |
tokenizer_name or pretrained_model_name_or_path, *args, **kwargs | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
return model, tokenizer | |
def generate_batch_step( | |
batch: list[str], | |
model: PreTrainedModel, | |
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], | |
num_generations_per_prompt: int = 1, | |
**generation_kwargs, | |
) -> list[list[str]]: | |
"""Generate completions for a batch of input prompts | |
Returns | |
------- | |
completions : list[list[str]] | |
A list of lists of generated completions for each prompt | |
shape (batch_size, num_generations_per_prompt) | |
""" | |
padding_side_default = tokenizer.padding_side | |
tokenizer.padding_side = "left" | |
inputs = tokenizer( | |
batch, | |
return_tensors="pt", | |
padding=True, | |
pad_to_multiple_of=8, | |
truncation=False, | |
).to(model.device) | |
generated_ids = [] | |
with torch.inference_mode(), torch.backends.cuda.sdp_kernel(): | |
for _ in range(num_generations_per_prompt): | |
outputs = model.generate(**inputs, **generation_kwargs) | |
generated_ids.extend( | |
output[len(prompt_ids) :] | |
for prompt_ids, output in zip(inputs["input_ids"], outputs) | |
) | |
completions_all: list[str] = tokenizer.batch_decode( | |
generated_ids, skip_special_tokens=True | |
) | |
# reshape to (batch_size, num_generations_per_prompt) | |
batch_size = len(batch) | |
tokenizer.padding_side = padding_side_default | |
return [ | |
[completions_all[j * batch_size + i] for j in range(num_generations_per_prompt)] | |
for i in range(batch_size) | |
] | |
def generate_ddp( | |
prompts: list[str], | |
model: PreTrainedModel, | |
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], | |
accelerator: Accelerator, | |
batch_size: int = 32, | |
num_generations_per_prompt: int = 1, | |
**generation_kwargs, | |
) -> list[list[str]]: | |
"""Generate completions for a list of prompts using distributed data parallel | |
Returns | |
------- | |
completions : list[list[str]] | |
A list of lists of generated completions for each prompt | |
shape (batch_size, num_generations_per_prompt) | |
""" | |
prompts_batch_list = [ | |
prompts[i : i + batch_size] for i in range(0, len(prompts), batch_size) | |
] | |
completions_per_process = [] | |
with accelerator.split_between_processes( | |
prompts_batch_list, apply_padding=True | |
) as batches_split: | |
for batch in tqdm( | |
batches_split, desc=f"Generating completions on device {accelerator.device}" | |
): | |
outputs: list[list[str]] = generate_batch_step( | |
batch, | |
model, | |
tokenizer, | |
num_generations_per_prompt, | |
**generation_kwargs, | |
) | |
completions_per_process.extend(outputs) | |
completions = gather_object(completions_per_process) | |
# Drop duplicates produced by apply_padding in split_between_processes | |
return completions[: len(prompts)] | |
def test(): | |
accelerator = Accelerator() | |
model, tokenizer = load_model_and_tokenizer( | |
tokenizer_name="meta-llama/Llama-2-7b-hf", | |
pretrained_model_name_or_path="apple/OpenELM-270M-Instruct", | |
accelerator=accelerator, | |
trust_remote_code=True, | |
token="your_token", | |
) | |
outputs = generate_ddp( | |
["The president of USA is: ", "Good morning, "] * 16, | |
model, | |
tokenizer, | |
accelerator, | |
num_generations_per_prompt=2, | |
do_sample=True, | |
penalty_alpha=0.5, | |
max_new_tokens=10, | |
top_k=10, | |
num_beams=2, | |
) | |
assert len(outputs) == 64 | |
assert len(outputs[0]) == 2 | |
assert isinstance(outputs[0][0], str) | |
assert outputs[0][0] != outputs[1][0] | |
assert outputs[0][0] != outputs[0][1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
import into main script and run
accelerate launch --config_file {config_ddp.yaml} {main_script.py} --args
with
config_ddp.yaml
generated from https://huggingface.co/docs/accelerate/en/usage_guides/explore using multi-node multi-gpu config.Pay attention to specify for each global device rank