Created
May 2, 2024 10:53
-
-
Save pacman100/a6c89a681f8f76bdf17f5bf6874eb983 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. First checkout the trl branch: | |
git clone https://github.com/huggingface/trl.git | |
git checkout debug-dpo | |
2. Install deps with: | |
make dev | |
Then install latest versions of transformers / accelerate / deepspeed | |
pip install transformers==4.39.1 accelerate==0.28.0 deepspeed==0.14.0 | |
See examples/scripts/requirements.txt for exact versions. | |
3. Run with: | |
TRANSFORMERS_VERBOSITY=info ACCELERATE_LOG_LEVEL=info accelerate launch --config_file=examples/accelerate_configs/deepspeed_zero3.yaml examples/scripts/debug_text_gen_dpo.py | |
If you change `gradient_accumulation_steps=1` in the `TrainingArguments` and `examples/accelerate_configs/deepspeed_zero3.yaml` config it runs fine. But with `gradient_accumulation_steps=2` it fails with the following error: | |
Traceback (most recent call last): | |
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 141, in <module> | |
main() | |
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 137, in main | |
trainer.train() | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1624, in train | |
return inner_training_loop( | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1961, in _inner_training_loop | |
tr_loss_step = self.training_step(model, inputs) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 2902, in training_step | |
loss = self.compute_loss(model, inputs) | |
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 43, in compute_loss | |
with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model: | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/contextlib.py", line 142, in __exit__ | |
next(self.gen) | |
File "/fsx/lewis/git/hf/trl/trl/models/utils.py", line 146, in unwrap_model_for_generation | |
with deepspeed.zero.GatheredParameters(model.parameters()): | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 2177, in __exit__ | |
self.params[0].partition(param_list=self.params, has_been_updated=False) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1325, in partition | |
self._partition(param_list, has_been_updated=has_been_updated) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1474, in _partition | |
self._partition_param(param, has_been_updated=has_been_updated) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn | |
ret_val = func(*args, **kwargs) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1507, in _partition_param | |
free_param(param) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn | |
ret_val = func(*args, **kwargs) | |
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 279, in free_param | |
assert not param.ds_active_sub_modules, param.ds_summary() | |
AssertionError: {'id': 0, 'status': 'AVAILABLE', 'numel': 25755648, 'ds_numel': 25755648, 'shape': (50304, 512), 'ds_shape': (50304, 512), 'requires_grad': True, 'grad_shape': None, 'persist': False, 'active_sub_modules': {182}, 'ds_tensor.shape': torch.Size([3219456])} | |
""" | |
import warnings | |
from contextlib import nullcontext | |
from typing import Any, Dict, Tuple, Union | |
import torch | |
import torch.nn as nn | |
from datasets import Dataset | |
from transformers import ( | |
AutoModelForCausalLM, | |
AutoTokenizer, | |
PreTrainedModel, | |
TrainingArguments, | |
) | |
from trl import DPOTrainer | |
from trl.models.utils import unwrap_model_for_generation | |
class MyDPOTrainer(DPOTrainer): | |
def compute_loss( | |
self, | |
model: Union[PreTrainedModel, nn.Module], | |
inputs: Dict[str, Union[torch.Tensor, Any]], | |
return_outputs=False, | |
) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict[str, torch.Tensor]]]: | |
if not self.use_dpo_data_collator: | |
warnings.warn( | |
"compute_loss is only implemented for DPODataCollatorWithPadding, and you passed a datacollator that is different than " | |
"DPODataCollatorWithPadding - you might see unexpected behavior. Alternatively, you can implement your own prediction_step method if you are using a custom data collator" | |
) | |
with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model: | |
import time | |
start_time = time.time() | |
# prefix = self.tokenizer(["<|user|> "]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda() | |
# suffix = self.tokenizer([" <|assistant|>"]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda() | |
# print(f"{prefix.shape=} {suffix.shape=} {inputs['prompt_input_ids'].shape=}") | |
# torch.hstack((prefix, inputs["prompt_input_ids"], suffix) | |
generations = unwrapped_model.generate(inputs["prompt_input_ids"], max_new_tokens=30, | |
do_sample=True, | |
temperature=0.2, | |
top_k=50, | |
top_p=0.95, | |
repetition_penalty=1.2, | |
eos_token_id=self.tokenizer.eos_token_id) | |
print(self.tokenizer.batch_decode(generations)) | |
generation_time = torch.tensor([time.time() - start_time]).to(self.accelerator.device) | |
Gather all gen_time and compute mean | |
generation_time_gather = self.accelerator.gather(generation_time) | |
print(f"{self.accelerator.process_index=} Win rate generation time: {generation_time_gather.mean().item():.2f}") | |
if self.accelerator.is_main_process: | |
print( | |
f"Win rate generation time: {generation_time_gather.mean().item():.2f} seconds for {len(generations)} generations" | |
) | |
compute_loss_context_manager = torch.cuda.amp.autocast if self._peft_has_been_casted_to_bf16 else nullcontext | |
with compute_loss_context_manager(): | |
loss, metrics = self.get_batch_loss_metrics(model, inputs, train_eval="train") | |
# Make sure to move the loss to the device the original accumulating loss is at back in the `Trainer` class: | |
loss = loss.to(self.args.device) | |
# force log the metrics | |
self.store_metrics(metrics, train_eval="train") | |
print(f"{loss=}") | |
if return_outputs: | |
return (loss, metrics) | |
return loss | |
def main(): | |
training_args = TrainingArguments( | |
output_dir="scratch/dummy-model", | |
per_device_train_batch_size=2, | |
max_steps=50, | |
remove_unused_columns=False, | |
gradient_accumulation_steps=2, # Runs fine with gradient_accumulation_steps=1 | |
learning_rate=5e-5, | |
evaluation_strategy="steps", | |
bf16=True, | |
) | |
# fmt: off | |
dummy_dataset_dict = { | |
"prompt": [ | |
"<|user|> hello, nice to meet you.<|endoftext|> <|assistant|> ", | |
"<|user|> how are you<|endoftext|> <|assistant|> ", | |
"<|user|> What is your name?<|endoftext|> <|assistant|> ", | |
"<|user|> What is your name?<|endoftext|> <|assistant|> ", | |
"<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ", | |
"<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ", | |
"<|user|> How is the stock price?<|endoftext|> <|assistant|> ", | |
"<|user|> How is the stock price?<|endoftext|> <|assistant|> ", | |
], | |
"chosen": [ | |
"hi nice to meet you<|endoftext|>", | |
"I am fine<|endoftext|>", | |
"My name is Mary<|endoftext|>", | |
"My name is Mary<|endoftext|>", | |
"Python<|endoftext|>", | |
"Python<|endoftext|>", | |
"$46 as of 10am EST<|endoftext|>", | |
"46 as of 10am EST<|endoftext|>", | |
], | |
"rejected": [ | |
"leave me alone<|endoftext|>", | |
"I am not fine<|endoftext|>", | |
"Whats it to you?<|endoftext|>", | |
"I dont have a name<|endoftext|>", | |
"Javascript<|endoftext|>", | |
"C++<|endoftext|>", | |
"what stock price?<|endoftext|>", | |
"I don't understand what you mean by \"stock price\"<|endoftext|>", | |
], | |
} | |
dummy_dataset = Dataset.from_dict(dummy_dataset_dict) | |
model_id = "HuggingFaceH4/pythia-70m-sft" | |
model_revision = "v0.0" | |
model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision) | |
ref_model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision) | |
tokenizer = AutoTokenizer.from_pretrained(model_id, revision=model_revision) | |
tokenizer.pad_token_id = 1 | |
trainer = MyDPOTrainer( | |
model=model, | |
ref_model=ref_model, | |
beta=0.1, | |
loss_type="sigmoid", | |
args=training_args, | |
tokenizer=tokenizer, | |
train_dataset=dummy_dataset, | |
eval_dataset=dummy_dataset, | |
precompute_ref_log_probs=False, | |
) | |
trainer.train() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from dataclasses import dataclass | |
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union | |
from accelerate.utils import is_deepspeed_available | |
from transformers import PreTrainedModel, PreTrainedTokenizer | |
from .modeling_value_head import AutoModelForCausalLMWithValueHead, AutoModelForSeq2SeqLMWithValueHead | |
import itertools | |
SUPPORTED_ARCHITECTURES = ( | |
AutoModelForCausalLMWithValueHead, | |
AutoModelForSeq2SeqLMWithValueHead, | |
) | |
if is_deepspeed_available(): | |
import deepspeed | |
if TYPE_CHECKING: | |
from accelerate import Accelerator | |
from deepspeed.runtime.engine import DeepSpeedEngine | |
from torch.nn.parallel.distributed import DistributedDataParallel | |
from .modeling_base import PreTrainedModelWrapper | |
# TODO: Add Abstract Base Class if more formats are added | |
@dataclass | |
class ChatMlSpecialTokens: | |
"""Dataclass for special tokens used in ChatML, including system, user, assistant, bos, eos, and pad tokens.""" | |
bos_token: str = "<|im_start|>" | |
eos_token: str = "<|im_end|>" | |
pad_token: str = "<|im_end|>" | |
@property | |
def system(self): | |
return f"{self.bos_token}system" | |
@property | |
def user(self): | |
return f"{self.bos_token}user" | |
@property | |
def assistant(self): | |
return f"{self.bos_token}assistant" | |
@property | |
def chat_template(self): | |
return ( | |
"{% for message in messages %}" | |
f"{{{{'{self.bos_token}' + message['role'] + '\n' + message['content'] + '{self.eos_token}' + '\n'}}}}" | |
"{% endfor %}" | |
"{% if add_generation_prompt %}" | |
f"{{{{ '{self.assistant}\n' }}}}" | |
"{% endif %}" | |
) | |
FORMAT_MAPPING = {"chatml": ChatMlSpecialTokens} | |
def setup_chat_format( | |
model: PreTrainedModel, | |
tokenizer: PreTrainedTokenizer, | |
format: Optional[Literal["chatml"]] = "chatml", | |
resize_to_multiple_of: Optional[int] = None, | |
) -> Tuple[PreTrainedModel, PreTrainedTokenizer]: | |
""" | |
Setup chat format by adding special tokens to the tokenizer, setting the correct format, and extending the embedding layer of the model based on the new special tokens. | |
Args: | |
model (`~transformers.PreTrainedModel`): The model to be modified. | |
tokenizer (`~transformers.PreTrainedTokenizer`): The tokenizer to be modified. | |
format (`Optional[Literal["chatml"]]`): The format to be set. Defaults to "chatml". | |
resize_to_multiple_of (`Optional[int]`): Number to resize the embedding layer to. Defaults to None. | |
Returns: | |
model (`~transformers.PreTrainedModel`): The modified model. | |
tokenizer (`~transformers.PreTrainedTokenizer`): The modified tokenizer. | |
""" | |
# check if format available and retrieve | |
if format not in FORMAT_MAPPING: | |
raise ValueError(f"Format {format} not available. Please use one of {FORMAT_MAPPING.keys()}") | |
chat_format = FORMAT_MAPPING[format]() | |
# set special tokens and them | |
tokenizer.eos_token = chat_format.eos_token | |
tokenizer.pad_token = chat_format.pad_token | |
tokenizer.bos_token = chat_format.bos_token | |
tokenizer.add_special_tokens({"additional_special_tokens": [chat_format.bos_token, chat_format.eos_token]}) | |
# set chat format for tokenizer | |
tokenizer.chat_template = chat_format.chat_template | |
# resize embedding layer to a multiple of 64, https://x.com/karpathy/status/1621578354024677377 | |
model.resize_token_embeddings( | |
len(tokenizer), pad_to_multiple_of=resize_to_multiple_of if resize_to_multiple_of is not None else None | |
) | |
# Make sure to update the generation config to use the new eos & bos token | |
if getattr(model, "generation_config", None) is not None: | |
model.generation_config.bos_token_id = tokenizer.bos_token_id | |
model.generation_config.eos_token_id = tokenizer.eos_token_id | |
model.generation_config.pad_token_id = tokenizer.pad_token_id | |
return model, tokenizer | |
def remove_hooks(model: "DeepSpeedEngine") -> None: | |
"""Removes the optimizer hooks from a DeepSpeed ZeRO-3 model.""" | |
if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"): | |
optimizer_offload = model.optimizer.parameter_offload | |
elif model.optimizer is not None: | |
optimizer_offload = model.optimizer | |
for param in iter_params(optimizer_offload.module, recurse=True): | |
param.ds_active_sub_modules.clear() | |
for hook in optimizer_offload.forward_hooks: | |
hook.remove() | |
for hook in optimizer_offload.backward_hooks: | |
hook.remove() | |
optimizer_offload.forward_hooks = [] | |
optimizer_offload.backward_hooks = [] | |
def get_all_parameters(sub_module, recurse=False): | |
return itertools.chain(sub_module.named_parameters(recurse=recurse), sub_module.ds_external_parameters()) | |
def iter_params(module, recurse=False): | |
return map(lambda pair: pair[1], get_all_parameters(module, recurse)) | |
def add_hooks(model: "DeepSpeedEngine") -> None: | |
"""Adds the optimizer hooks from a DeepSpeed ZeRO-3 model.""" | |
if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"): | |
optimizer_offload = model.optimizer.parameter_offload | |
elif model.optimizer is not None: | |
optimizer_offload = model.optimizer | |
optimizer_offload._register_hooks_recursively(optimizer_offload.module) | |
@contextmanager | |
def unwrap_model_for_generation( | |
model: Union["DistributedDataParallel", "DeepSpeedEngine"], accelerator: "Accelerator", is_peft_model: bool = False | |
) -> Union["PreTrainedModelWrapper", "DeepSpeedEngine"]: | |
"""Context manager to unwrap a model for generation. | |
For ZeRO-3 models, we gather the weights once to speed up generation. | |
""" | |
unwrapped_model = accelerator.unwrap_model(model) | |
if is_peft_model: | |
unwrapped_model.pretrained_model.disable_adapter() | |
if accelerator.state.deepspeed_plugin is not None and accelerator.state.deepspeed_plugin.zero_stage == 3: | |
with deepspeed.zero.GatheredParameters(model.parameters()): | |
remove_hooks(model) | |
yield model | |
add_hooks(model) | |
else: | |
yield unwrapped_model |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment