Created
September 22, 2024 11:20
-
-
Save SinclairCoder/ddfd60f32c423d2e3f5b211cdb7e95a4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
import random | |
from datasets import Dataset | |
from datatrove.pipeline.readers import JsonlReader, ParquetReader | |
from tqdm import tqdm | |
from transformers import AutoTokenizer | |
from vllm import LLM, SamplingParams | |
from data_gen.configs import GentaskConfig | |
from utils.data_utils import get_adapter_func, split_into_batches | |
from utils.utils import print_colored_text, truncate_text | |
from utils.df_utils import execute_meta_operations | |
random.seed(42) | |
import socket | |
def find_free_port(): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.bind(('', 0)) | |
return s.getsockname()[1] | |
""" | |
TOTAL_SPLIT=8 CUDA_VISIBLE_DEVICES=0 python -m rw_npython_apply_vllm > ./logging/npython_codegen_rw_0.log | |
TOTAL_SPLIT=1 CUDA_VISIBLE_DEVICES=0 python -m rw_npython_apply_vllm > ./logging/npython_codegen_rw_0.log | |
""" | |
# dummy env constants for multi-gpu & multi-node | |
NODE_GPUS = int(os.environ.get("NODE_GPUS", 8)) | |
NODE_RANK = int(os.environ.get("NODE_RANK", 0)) | |
CUDA_DEVICE = int(os.environ["CUDA_VISIBLE_DEVICES"]) | |
TOTAL_SPLIT = int(os.environ["TOTAL_SPLIT"]) | |
def normalize_text(text: str, tokenizer, max_token=2000) -> str: | |
text_line = [] | |
max_digits = 3 | |
cur_tokens = 0 | |
for idx_line, line in enumerate(text.split("\n")): | |
normalize_line = f"[{idx_line:0{max_digits}d}]{line}" | |
line_token = len(tokenizer.encode(normalize_line)) | |
if line_token + cur_tokens > max_token: | |
break | |
else: | |
text_line.append(f"[{idx_line:0{max_digits}d}]{line}") | |
cur_tokens += line_token | |
text = "\n".join(text_line) | |
return text | |
def main(args): | |
# load config | |
config = GentaskConfig().from_yaml(args.config_path) | |
tokenizer = AutoTokenizer.from_pretrained(config.model_path) | |
tokenizer.model_max_length = 2048 | |
# new | |
free_port = find_free_port() | |
# 设置 Ray 端口 | |
os.environ['RAY_ADDRESS'] = f'localhost:{free_port}' | |
adapter_func = get_adapter_func("openwebmath", line_num_limit=None) | |
# prepare data | |
if args.data_type == "parquet": | |
data_reader = ParquetReader( | |
data_folder=config.data_path, | |
progress=True, | |
adapter=adapter_func, | |
batch_size=args.batch_size, | |
limit=args.limit, | |
) | |
elif args.data_type == "jsonl": | |
data_reader = JsonlReader( | |
data_folder=config.data_path, | |
adapter=adapter_func, | |
progress=True, | |
limit=args.limit, | |
) | |
arguments = [] | |
for idx, doc in enumerate(data_reader.run(rank = CUDA_DEVICE + NODE_RANK * NODE_GPUS, world_size = TOTAL_SPLIT)): | |
# if (idx) % TOTAL_SPLIT == CUDA_DEVICE + NODE_RANK * NODE_GPUS: | |
# arguments.append({"text": doc.text}) | |
arguments.append({"text": doc.text}) | |
print_colored_text(arguments[0], "green") | |
# start job | |
dir_path = os.path.join( | |
config.save_path, | |
f"{config.save_name}" + f"_{CUDA_DEVICE + NODE_RANK * NODE_GPUS}", | |
) | |
os.makedirs(dir_path, exist_ok=True) | |
base_name = config.save_name | |
batches = split_into_batches(arguments, config.save_interval) | |
tp_size = 1 # Tensor Parallelism | |
sampling_params = SamplingParams(temperature=0.0, top_p=0.9, max_tokens=config.max_tokens) | |
engine = LLM( | |
model=config.model_path, | |
gpu_memory_utilization=0.9, | |
tensor_parallel_size=tp_size, | |
max_model_len = 2048 | |
) | |
print(f"Total batches: {len(batches)}") | |
for i, batch in enumerate(tqdm(batches, total = len(batches), unit="batches")): | |
# suppose we only need single backend | |
rets = [] | |
# for idx, sample in enumerate(tqdm(batch, total=len(batch), unit="tokenizing")): | |
for idx, sample in enumerate(batch): | |
if args.input_mode == "truncate": | |
user_msg = truncate_text(sample["text"], tokenizer, max_token=1900) | |
else: | |
user_msg = normalize_text(sample["text"], tokenizer, max_token=1700) | |
total_msg = tokenizer.apply_chat_template( | |
[ | |
{ | |
"role": "system", | |
"content": "You are a helpful, respectful and honest assistant.", | |
}, | |
{"role": "user", "content": user_msg}, | |
], | |
add_generation_prompt=True, | |
tokenize=False, | |
# truncation=True | |
) | |
rets.append(total_msg) | |
outputs = engine.generate(rets, sampling_params) | |
outputs = [item.outputs[0].text.strip(" ") for item in outputs] | |
rets = [ | |
{ | |
"raw_content": sample["text"], | |
"text": execute_meta_operations(sample['text'], output), | |
"metadata": { | |
"df_program": output | |
} | |
} | |
for sample, output in zip(batch, outputs) | |
] | |
intermediate_ds = Dataset.from_list(rets) | |
out_path = ( | |
os.path.join( | |
dir_path, | |
f"{base_name}_{i + 1}_{(len(arguments) - 1) // config.save_interval + 1}.parquet", | |
) | |
if config.save_interval is not None | |
else os.path.join(dir_path, f"{base_name}.parquet") | |
) | |
intermediate_ds.to_parquet(out_path) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--config_path", | |
type=str, | |
default="data_gen/configs/drop_gen/tinylm_apply.yaml", | |
) | |
parser.add_argument( | |
"--data_type", | |
type=str, | |
default="parquet", | |
) | |
parser.add_argument("--batch_size", type=int, default=1000000) | |
parser.add_argument("--limit", type=int, default=-1, help="Limit the number of samples to process, for debugging.") | |
parser.add_argument("--input_mode", type=str, default="truncate", choices=['truncate', "normalize"]) | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment