Skip to content

Instantly share code, notes, and snippets.

@celsowm
Last active March 18, 2026 02:59
Show Gist options
  • Select an option

  • Save celsowm/a3f84d6c63947fabbfa3c113ec918481 to your computer and use it in GitHub Desktop.

Select an option

Save celsowm/a3f84d6c63947fabbfa3c113ec918481 to your computer and use it in GitHub Desktop.
full fine tuning qwen 3.5
import os
import math
import json
os.environ.setdefault("CUDA_DEVICE_ORDER", "PCI_BUS_ID")
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "7")
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
import torch
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, set_seed
from trl import SFTConfig, SFTTrainer
MODEL_NAME = "Qwen/Qwen3.5-4B"
DATASET_NAME = "celsowm/legal_br_sft"
OUTPUT_DIR = "/home/fontesc/qwen35-4b-legal-br-fullft"
SEED = 42
MAX_LENGTH = 1024
VAL_SIZE = 0.05
TEST_SIZE = 0.05
NUM_TRAIN_EPOCHS = 1
LEARNING_RATE = 1e-5
WEIGHT_DECAY = 0.1
WARMUP_RATIO = 0.03
PER_DEVICE_TRAIN_BATCH_SIZE = 4
PER_DEVICE_EVAL_BATCH_SIZE = 4
GRADIENT_ACCUMULATION_STEPS = 4
LOGGING_STEPS = 10
EVAL_STEPS = 100
SAVE_STEPS = 100
SAVE_TOTAL_LIMIT = 2
OPTIMIZER = "adamw_torch_fused"
ATTN_IMPLEMENTATION = "flash_attention_2"
REPORT_TO = "none"
def flatten_content(content):
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif "text" in item:
parts.append(item["text"])
elif "content" in item:
parts.append(item["content"])
return "".join(parts)
return str(content)
def normalize_messages(messages):
return [
{
"role": str(m["role"]),
"content": flatten_content(m.get("content", "")),
}
for m in messages
]
def to_messages_only(example):
return {"messages": normalize_messages(example["messages"])}
def count_parameters(model):
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def safe_exp(x):
try:
return math.exp(x)
except OverflowError:
return float("inf")
def estimate_steps(num_examples, per_device_batch_size, grad_accum_steps, epochs):
steps_per_epoch = math.ceil(num_examples / (per_device_batch_size * grad_accum_steps))
total_steps = steps_per_epoch * epochs
return steps_per_epoch, total_steps
def save_json(obj, path):
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
def main():
os.makedirs(OUTPUT_DIR, exist_ok=True)
set_seed(SEED)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
print("=" * 80)
print("CUDA_VISIBLE_DEVICES =", os.environ.get("CUDA_VISIBLE_DEVICES"))
print("torch:", torch.__version__)
print("cuda:", torch.cuda.is_available())
print("gpus:", torch.cuda.device_count())
if torch.cuda.is_available():
print("gpu:", torch.cuda.get_device_name(0))
print("bf16:", torch.cuda.is_bf16_supported())
print("=" * 80)
if not torch.cuda.is_available():
raise RuntimeError("CUDA não está disponível.")
use_bf16 = torch.cuda.is_bf16_supported()
dtype = torch.bfloat16 if use_bf16 else torch.float16
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
use_fast=True,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
print(f"Carregando modelo com attn_implementation={ATTN_IMPLEMENTATION} ...")
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
dtype=dtype,
low_cpu_mem_usage=True,
attn_implementation=ATTN_IMPLEMENTATION,
)
for p in model.parameters():
p.requires_grad = True
model.config.use_cache = False
total_params, trainable_params = count_parameters(model)
print(f"Total params : {total_params:,}")
print(f"Trainable params : {trainable_params:,}")
ds = load_dataset(DATASET_NAME, split="train")
print("dataset total:", len(ds))
split_1 = ds.train_test_split(test_size=VAL_SIZE + TEST_SIZE, seed=SEED)
train_ds = split_1["train"]
holdout_ds = split_1["test"]
relative_test_size = TEST_SIZE / (VAL_SIZE + TEST_SIZE)
split_2 = holdout_ds.train_test_split(test_size=relative_test_size, seed=SEED)
val_ds = split_2["train"]
test_ds = split_2["test"]
print("train:", len(train_ds))
print("val :", len(val_ds))
print("test :", len(test_ds))
train_ds = train_ds.map(
to_messages_only,
remove_columns=train_ds.column_names,
desc="format train",
)
val_ds = val_ds.map(
to_messages_only,
remove_columns=val_ds.column_names,
desc="format val",
)
test_ds = test_ds.map(
to_messages_only,
remove_columns=test_ds.column_names,
desc="format test",
)
steps_per_epoch, total_steps = estimate_steps(
num_examples=len(train_ds),
per_device_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE,
grad_accum_steps=GRADIENT_ACCUMULATION_STEPS,
epochs=NUM_TRAIN_EPOCHS,
)
warmup_steps = max(1, math.ceil(total_steps * WARMUP_RATIO))
print(f"Steps per epoch : {steps_per_epoch}")
print(f"Estimated optimizer steps : {total_steps}")
print(f"Warmup steps : {warmup_steps}")
args = SFTConfig(
output_dir=OUTPUT_DIR,
do_train=True,
do_eval=True,
num_train_epochs=NUM_TRAIN_EPOCHS,
learning_rate=LEARNING_RATE,
weight_decay=WEIGHT_DECAY,
warmup_steps=warmup_steps,
lr_scheduler_type="cosine",
per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE,
per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH_SIZE,
gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
logging_steps=LOGGING_STEPS,
eval_strategy="steps",
eval_steps=EVAL_STEPS,
save_strategy="steps",
save_steps=SAVE_STEPS,
save_total_limit=SAVE_TOTAL_LIMIT,
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
bf16=use_bf16,
fp16=not use_bf16,
tf32=True,
gradient_checkpointing=True,
packing=True,
eval_packing=False,
max_length=MAX_LENGTH,
optim=OPTIMIZER,
report_to=REPORT_TO,
seed=SEED,
dataloader_num_workers=0,
remove_unused_columns=False,
eos_token=tokenizer.eos_token,
pad_token=tokenizer.pad_token,
)
trainer = SFTTrainer(
model=model,
args=args,
train_dataset=train_ds,
eval_dataset=val_ds,
processing_class=tokenizer,
)
train_result = trainer.train()
train_metrics = dict(train_result.metrics)
if "train_loss" in train_metrics:
train_metrics["train_perplexity"] = safe_exp(train_metrics["train_loss"])
save_json(train_metrics, os.path.join(OUTPUT_DIR, "train_metrics.json"))
print("\nAvaliando no conjunto de validação...")
val_metrics = trainer.evaluate(eval_dataset=val_ds, metric_key_prefix="val")
if "val_loss" in val_metrics:
val_metrics["val_perplexity"] = safe_exp(val_metrics["val_loss"])
print("\nAvaliando no conjunto de teste...")
test_metrics = trainer.evaluate(eval_dataset=test_ds, metric_key_prefix="test")
if "test_loss" in test_metrics:
test_metrics["test_perplexity"] = safe_exp(test_metrics["test_loss"])
print("\nVal metrics:")
print(val_metrics)
print("\nTest metrics:")
print(test_metrics)
save_json(val_metrics, os.path.join(OUTPUT_DIR, "val_metrics.json"))
save_json(test_metrics, os.path.join(OUTPUT_DIR, "test_metrics.json"))
trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print("\n✅ FINALIZADO")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment