Skip to content

Instantly share code, notes, and snippets.

@SinclairCoder
Created September 22, 2024 11:20
Show Gist options
  • Save SinclairCoder/ddfd60f32c423d2e3f5b211cdb7e95a4 to your computer and use it in GitHub Desktop.
Save SinclairCoder/ddfd60f32c423d2e3f5b211cdb7e95a4 to your computer and use it in GitHub Desktop.
import argparse
import os
import random
from datasets import Dataset
from datatrove.pipeline.readers import JsonlReader, ParquetReader
from tqdm import tqdm
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams
from data_gen.configs import GentaskConfig
from utils.data_utils import get_adapter_func, split_into_batches
from utils.utils import print_colored_text, truncate_text
from utils.df_utils import execute_meta_operations
random.seed(42)
import socket
def find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
return s.getsockname()[1]
"""
TOTAL_SPLIT=8 CUDA_VISIBLE_DEVICES=0 python -m rw_npython_apply_vllm > ./logging/npython_codegen_rw_0.log
TOTAL_SPLIT=1 CUDA_VISIBLE_DEVICES=0 python -m rw_npython_apply_vllm > ./logging/npython_codegen_rw_0.log
"""
# dummy env constants for multi-gpu & multi-node
NODE_GPUS = int(os.environ.get("NODE_GPUS", 8))
NODE_RANK = int(os.environ.get("NODE_RANK", 0))
CUDA_DEVICE = int(os.environ["CUDA_VISIBLE_DEVICES"])
TOTAL_SPLIT = int(os.environ["TOTAL_SPLIT"])
def normalize_text(text: str, tokenizer, max_token=2000) -> str:
text_line = []
max_digits = 3
cur_tokens = 0
for idx_line, line in enumerate(text.split("\n")):
normalize_line = f"[{idx_line:0{max_digits}d}]{line}"
line_token = len(tokenizer.encode(normalize_line))
if line_token + cur_tokens > max_token:
break
else:
text_line.append(f"[{idx_line:0{max_digits}d}]{line}")
cur_tokens += line_token
text = "\n".join(text_line)
return text
def main(args):
# load config
config = GentaskConfig().from_yaml(args.config_path)
tokenizer = AutoTokenizer.from_pretrained(config.model_path)
tokenizer.model_max_length = 2048
# new
free_port = find_free_port()
# 设置 Ray 端口
os.environ['RAY_ADDRESS'] = f'localhost:{free_port}'
adapter_func = get_adapter_func("openwebmath", line_num_limit=None)
# prepare data
if args.data_type == "parquet":
data_reader = ParquetReader(
data_folder=config.data_path,
progress=True,
adapter=adapter_func,
batch_size=args.batch_size,
limit=args.limit,
)
elif args.data_type == "jsonl":
data_reader = JsonlReader(
data_folder=config.data_path,
adapter=adapter_func,
progress=True,
limit=args.limit,
)
arguments = []
for idx, doc in enumerate(data_reader.run(rank = CUDA_DEVICE + NODE_RANK * NODE_GPUS, world_size = TOTAL_SPLIT)):
# if (idx) % TOTAL_SPLIT == CUDA_DEVICE + NODE_RANK * NODE_GPUS:
# arguments.append({"text": doc.text})
arguments.append({"text": doc.text})
print_colored_text(arguments[0], "green")
# start job
dir_path = os.path.join(
config.save_path,
f"{config.save_name}" + f"_{CUDA_DEVICE + NODE_RANK * NODE_GPUS}",
)
os.makedirs(dir_path, exist_ok=True)
base_name = config.save_name
batches = split_into_batches(arguments, config.save_interval)
tp_size = 1 # Tensor Parallelism
sampling_params = SamplingParams(temperature=0.0, top_p=0.9, max_tokens=config.max_tokens)
engine = LLM(
model=config.model_path,
gpu_memory_utilization=0.9,
tensor_parallel_size=tp_size,
max_model_len = 2048
)
print(f"Total batches: {len(batches)}")
for i, batch in enumerate(tqdm(batches, total = len(batches), unit="batches")):
# suppose we only need single backend
rets = []
# for idx, sample in enumerate(tqdm(batch, total=len(batch), unit="tokenizing")):
for idx, sample in enumerate(batch):
if args.input_mode == "truncate":
user_msg = truncate_text(sample["text"], tokenizer, max_token=1900)
else:
user_msg = normalize_text(sample["text"], tokenizer, max_token=1700)
total_msg = tokenizer.apply_chat_template(
[
{
"role": "system",
"content": "You are a helpful, respectful and honest assistant.",
},
{"role": "user", "content": user_msg},
],
add_generation_prompt=True,
tokenize=False,
# truncation=True
)
rets.append(total_msg)
outputs = engine.generate(rets, sampling_params)
outputs = [item.outputs[0].text.strip(" ") for item in outputs]
rets = [
{
"raw_content": sample["text"],
"text": execute_meta_operations(sample['text'], output),
"metadata": {
"df_program": output
}
}
for sample, output in zip(batch, outputs)
]
intermediate_ds = Dataset.from_list(rets)
out_path = (
os.path.join(
dir_path,
f"{base_name}_{i + 1}_{(len(arguments) - 1) // config.save_interval + 1}.parquet",
)
if config.save_interval is not None
else os.path.join(dir_path, f"{base_name}.parquet")
)
intermediate_ds.to_parquet(out_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--config_path",
type=str,
default="data_gen/configs/drop_gen/tinylm_apply.yaml",
)
parser.add_argument(
"--data_type",
type=str,
default="parquet",
)
parser.add_argument("--batch_size", type=int, default=1000000)
parser.add_argument("--limit", type=int, default=-1, help="Limit the number of samples to process, for debugging.")
parser.add_argument("--input_mode", type=str, default="truncate", choices=['truncate', "normalize"])
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment