Skip to content

Instantly share code, notes, and snippets.

@pacman100
Created May 2, 2024 10:53
Show Gist options
  • Save pacman100/a6c89a681f8f76bdf17f5bf6874eb983 to your computer and use it in GitHub Desktop.
Save pacman100/a6c89a681f8f76bdf17f5bf6874eb983 to your computer and use it in GitHub Desktop.
"""
1. First checkout the trl branch:
git clone https://github.com/huggingface/trl.git
git checkout debug-dpo
2. Install deps with:
make dev
Then install latest versions of transformers / accelerate / deepspeed
pip install transformers==4.39.1 accelerate==0.28.0 deepspeed==0.14.0
See examples/scripts/requirements.txt for exact versions.
3. Run with:
TRANSFORMERS_VERBOSITY=info ACCELERATE_LOG_LEVEL=info accelerate launch --config_file=examples/accelerate_configs/deepspeed_zero3.yaml examples/scripts/debug_text_gen_dpo.py
If you change `gradient_accumulation_steps=1` in the `TrainingArguments` and `examples/accelerate_configs/deepspeed_zero3.yaml` config it runs fine. But with `gradient_accumulation_steps=2` it fails with the following error:
Traceback (most recent call last):
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 141, in <module>
main()
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 137, in main
trainer.train()
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1624, in train
return inner_training_loop(
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 1961, in _inner_training_loop
tr_loss_step = self.training_step(model, inputs)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/transformers/trainer.py", line 2902, in training_step
loss = self.compute_loss(model, inputs)
File "/fsx/lewis/git/hf/trl/examples/scripts/debug_text_gen_dpo.py", line 43, in compute_loss
with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model:
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/contextlib.py", line 142, in __exit__
next(self.gen)
File "/fsx/lewis/git/hf/trl/trl/models/utils.py", line 146, in unwrap_model_for_generation
with deepspeed.zero.GatheredParameters(model.parameters()):
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 2177, in __exit__
self.params[0].partition(param_list=self.params, has_been_updated=False)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1325, in partition
self._partition(param_list, has_been_updated=has_been_updated)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1474, in _partition
self._partition_param(param, has_been_updated=has_been_updated)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn
ret_val = func(*args, **kwargs)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 1507, in _partition_param
free_param(param)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 15, in wrapped_fn
ret_val = func(*args, **kwargs)
File "/fsx/lewis/miniconda3/envs/trl/lib/python3.10/site-packages/deepspeed/runtime/zero/partition_parameters.py", line 279, in free_param
assert not param.ds_active_sub_modules, param.ds_summary()
AssertionError: {'id': 0, 'status': 'AVAILABLE', 'numel': 25755648, 'ds_numel': 25755648, 'shape': (50304, 512), 'ds_shape': (50304, 512), 'requires_grad': True, 'grad_shape': None, 'persist': False, 'active_sub_modules': {182}, 'ds_tensor.shape': torch.Size([3219456])}
"""
import warnings
from contextlib import nullcontext
from typing import Any, Dict, Tuple, Union
import torch
import torch.nn as nn
from datasets import Dataset
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
PreTrainedModel,
TrainingArguments,
)
from trl import DPOTrainer
from trl.models.utils import unwrap_model_for_generation
class MyDPOTrainer(DPOTrainer):
def compute_loss(
self,
model: Union[PreTrainedModel, nn.Module],
inputs: Dict[str, Union[torch.Tensor, Any]],
return_outputs=False,
) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict[str, torch.Tensor]]]:
if not self.use_dpo_data_collator:
warnings.warn(
"compute_loss is only implemented for DPODataCollatorWithPadding, and you passed a datacollator that is different than "
"DPODataCollatorWithPadding - you might see unexpected behavior. Alternatively, you can implement your own prediction_step method if you are using a custom data collator"
)
with unwrap_model_for_generation(model, self.accelerator) as unwrapped_model:
import time
start_time = time.time()
# prefix = self.tokenizer(["<|user|> "]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda()
# suffix = self.tokenizer([" <|assistant|>"]*inputs["prompt_input_ids"].shape[0], return_tensors="pt").input_ids.cuda()
# print(f"{prefix.shape=} {suffix.shape=} {inputs['prompt_input_ids'].shape=}")
# torch.hstack((prefix, inputs["prompt_input_ids"], suffix)
generations = unwrapped_model.generate(inputs["prompt_input_ids"], max_new_tokens=30,
do_sample=True,
temperature=0.2,
top_k=50,
top_p=0.95,
repetition_penalty=1.2,
eos_token_id=self.tokenizer.eos_token_id)
print(self.tokenizer.batch_decode(generations))
generation_time = torch.tensor([time.time() - start_time]).to(self.accelerator.device)
Gather all gen_time and compute mean
generation_time_gather = self.accelerator.gather(generation_time)
print(f"{self.accelerator.process_index=} Win rate generation time: {generation_time_gather.mean().item():.2f}")
if self.accelerator.is_main_process:
print(
f"Win rate generation time: {generation_time_gather.mean().item():.2f} seconds for {len(generations)} generations"
)
compute_loss_context_manager = torch.cuda.amp.autocast if self._peft_has_been_casted_to_bf16 else nullcontext
with compute_loss_context_manager():
loss, metrics = self.get_batch_loss_metrics(model, inputs, train_eval="train")
# Make sure to move the loss to the device the original accumulating loss is at back in the `Trainer` class:
loss = loss.to(self.args.device)
# force log the metrics
self.store_metrics(metrics, train_eval="train")
print(f"{loss=}")
if return_outputs:
return (loss, metrics)
return loss
def main():
training_args = TrainingArguments(
output_dir="scratch/dummy-model",
per_device_train_batch_size=2,
max_steps=50,
remove_unused_columns=False,
gradient_accumulation_steps=2, # Runs fine with gradient_accumulation_steps=1
learning_rate=5e-5,
evaluation_strategy="steps",
bf16=True,
)
# fmt: off
dummy_dataset_dict = {
"prompt": [
"<|user|> hello, nice to meet you.<|endoftext|> <|assistant|> ",
"<|user|> how are you<|endoftext|> <|assistant|> ",
"<|user|> What is your name?<|endoftext|> <|assistant|> ",
"<|user|> What is your name?<|endoftext|> <|assistant|> ",
"<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ",
"<|user|> Which is the best programming language?<|endoftext|> <|assistant|> ",
"<|user|> How is the stock price?<|endoftext|> <|assistant|> ",
"<|user|> How is the stock price?<|endoftext|> <|assistant|> ",
],
"chosen": [
"hi nice to meet you<|endoftext|>",
"I am fine<|endoftext|>",
"My name is Mary<|endoftext|>",
"My name is Mary<|endoftext|>",
"Python<|endoftext|>",
"Python<|endoftext|>",
"$46 as of 10am EST<|endoftext|>",
"46 as of 10am EST<|endoftext|>",
],
"rejected": [
"leave me alone<|endoftext|>",
"I am not fine<|endoftext|>",
"Whats it to you?<|endoftext|>",
"I dont have a name<|endoftext|>",
"Javascript<|endoftext|>",
"C++<|endoftext|>",
"what stock price?<|endoftext|>",
"I don't understand what you mean by \"stock price\"<|endoftext|>",
],
}
dummy_dataset = Dataset.from_dict(dummy_dataset_dict)
model_id = "HuggingFaceH4/pythia-70m-sft"
model_revision = "v0.0"
model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision)
ref_model = AutoModelForCausalLM.from_pretrained(model_id, revision=model_revision)
tokenizer = AutoTokenizer.from_pretrained(model_id, revision=model_revision)
tokenizer.pad_token_id = 1
trainer = MyDPOTrainer(
model=model,
ref_model=ref_model,
beta=0.1,
loss_type="sigmoid",
args=training_args,
tokenizer=tokenizer,
train_dataset=dummy_dataset,
eval_dataset=dummy_dataset,
precompute_ref_log_probs=False,
)
trainer.train()
if __name__ == "__main__":
main()
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union
from accelerate.utils import is_deepspeed_available
from transformers import PreTrainedModel, PreTrainedTokenizer
from .modeling_value_head import AutoModelForCausalLMWithValueHead, AutoModelForSeq2SeqLMWithValueHead
import itertools
SUPPORTED_ARCHITECTURES = (
AutoModelForCausalLMWithValueHead,
AutoModelForSeq2SeqLMWithValueHead,
)
if is_deepspeed_available():
import deepspeed
if TYPE_CHECKING:
from accelerate import Accelerator
from deepspeed.runtime.engine import DeepSpeedEngine
from torch.nn.parallel.distributed import DistributedDataParallel
from .modeling_base import PreTrainedModelWrapper
# TODO: Add Abstract Base Class if more formats are added
@dataclass
class ChatMlSpecialTokens:
"""Dataclass for special tokens used in ChatML, including system, user, assistant, bos, eos, and pad tokens."""
bos_token: str = "<|im_start|>"
eos_token: str = "<|im_end|>"
pad_token: str = "<|im_end|>"
@property
def system(self):
return f"{self.bos_token}system"
@property
def user(self):
return f"{self.bos_token}user"
@property
def assistant(self):
return f"{self.bos_token}assistant"
@property
def chat_template(self):
return (
"{% for message in messages %}"
f"{{{{'{self.bos_token}' + message['role'] + '\n' + message['content'] + '{self.eos_token}' + '\n'}}}}"
"{% endfor %}"
"{% if add_generation_prompt %}"
f"{{{{ '{self.assistant}\n' }}}}"
"{% endif %}"
)
FORMAT_MAPPING = {"chatml": ChatMlSpecialTokens}
def setup_chat_format(
model: PreTrainedModel,
tokenizer: PreTrainedTokenizer,
format: Optional[Literal["chatml"]] = "chatml",
resize_to_multiple_of: Optional[int] = None,
) -> Tuple[PreTrainedModel, PreTrainedTokenizer]:
"""
Setup chat format by adding special tokens to the tokenizer, setting the correct format, and extending the embedding layer of the model based on the new special tokens.
Args:
model (`~transformers.PreTrainedModel`): The model to be modified.
tokenizer (`~transformers.PreTrainedTokenizer`): The tokenizer to be modified.
format (`Optional[Literal["chatml"]]`): The format to be set. Defaults to "chatml".
resize_to_multiple_of (`Optional[int]`): Number to resize the embedding layer to. Defaults to None.
Returns:
model (`~transformers.PreTrainedModel`): The modified model.
tokenizer (`~transformers.PreTrainedTokenizer`): The modified tokenizer.
"""
# check if format available and retrieve
if format not in FORMAT_MAPPING:
raise ValueError(f"Format {format} not available. Please use one of {FORMAT_MAPPING.keys()}")
chat_format = FORMAT_MAPPING[format]()
# set special tokens and them
tokenizer.eos_token = chat_format.eos_token
tokenizer.pad_token = chat_format.pad_token
tokenizer.bos_token = chat_format.bos_token
tokenizer.add_special_tokens({"additional_special_tokens": [chat_format.bos_token, chat_format.eos_token]})
# set chat format for tokenizer
tokenizer.chat_template = chat_format.chat_template
# resize embedding layer to a multiple of 64, https://x.com/karpathy/status/1621578354024677377
model.resize_token_embeddings(
len(tokenizer), pad_to_multiple_of=resize_to_multiple_of if resize_to_multiple_of is not None else None
)
# Make sure to update the generation config to use the new eos & bos token
if getattr(model, "generation_config", None) is not None:
model.generation_config.bos_token_id = tokenizer.bos_token_id
model.generation_config.eos_token_id = tokenizer.eos_token_id
model.generation_config.pad_token_id = tokenizer.pad_token_id
return model, tokenizer
def remove_hooks(model: "DeepSpeedEngine") -> None:
"""Removes the optimizer hooks from a DeepSpeed ZeRO-3 model."""
if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"):
optimizer_offload = model.optimizer.parameter_offload
elif model.optimizer is not None:
optimizer_offload = model.optimizer
for param in iter_params(optimizer_offload.module, recurse=True):
param.ds_active_sub_modules.clear()
for hook in optimizer_offload.forward_hooks:
hook.remove()
for hook in optimizer_offload.backward_hooks:
hook.remove()
optimizer_offload.forward_hooks = []
optimizer_offload.backward_hooks = []
def get_all_parameters(sub_module, recurse=False):
return itertools.chain(sub_module.named_parameters(recurse=recurse), sub_module.ds_external_parameters())
def iter_params(module, recurse=False):
return map(lambda pair: pair[1], get_all_parameters(module, recurse))
def add_hooks(model: "DeepSpeedEngine") -> None:
"""Adds the optimizer hooks from a DeepSpeed ZeRO-3 model."""
if model.optimizer is not None and hasattr(model.optimizer, "parameter_offload"):
optimizer_offload = model.optimizer.parameter_offload
elif model.optimizer is not None:
optimizer_offload = model.optimizer
optimizer_offload._register_hooks_recursively(optimizer_offload.module)
@contextmanager
def unwrap_model_for_generation(
model: Union["DistributedDataParallel", "DeepSpeedEngine"], accelerator: "Accelerator", is_peft_model: bool = False
) -> Union["PreTrainedModelWrapper", "DeepSpeedEngine"]:
"""Context manager to unwrap a model for generation.
For ZeRO-3 models, we gather the weights once to speed up generation.
"""
unwrapped_model = accelerator.unwrap_model(model)
if is_peft_model:
unwrapped_model.pretrained_model.disable_adapter()
if accelerator.state.deepspeed_plugin is not None and accelerator.state.deepspeed_plugin.zero_stage == 3:
with deepspeed.zero.GatheredParameters(model.parameters()):
remove_hooks(model)
yield model
add_hooks(model)
else:
yield unwrapped_model
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment