Skip to content

Instantly share code, notes, and snippets.

@HDCharles
Created April 28, 2026 20:02
Show Gist options
  • Select an option

  • Save HDCharles/e536c95625f6abe8e1df528ae2f4bc23 to your computer and use it in GitHub Desktop.

Select an option

Save HDCharles/e536c95625f6abe8e1df528ae2f4bc23 to your computer and use it in GitHub Desktop.
LLM Compressor regression testing scripts
import argparse
import time
import torch
from compressed_tensors.offload import dispatch_model
from compressed_tensors.quantization import preset_name_to_scheme
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
from llmcompressor import oneshot
from llmcompressor.modifiers.gptq import GPTQModifier
from llmcompressor.modifiers.quantization import QuantizationModifier
from llmcompressor.modifiers.transform.awq import AWQModifier
from llmcompressor.modifiers.transform.imatrix import IMatrixGatherer
from llmcompressor.modifiers.transform.smoothquant import SmoothQuantModifier
MODEL_CONFIGS = {
"meta-llama/Meta-Llama-3-8B-Instruct": {
"ignore": ["lm_head"],
"is_moe": False,
},
"Qwen/Qwen3-30B-A3B": {
"ignore": ["lm_head", "re:.*mlp.gate$", "re:.*mlp.shared_expert_gate$"],
"is_moe": True,
},
}
DATASET_ID = "HuggingFaceH4/ultrachat_200k"
DATASET_SPLIT = "train_sft"
def build_recipe(technique, scheme, ignore, is_moe):
if technique == "awq_rtn":
duo = "both" if not is_moe else False
return [
AWQModifier(duo_scaling=duo),
QuantizationModifier(
ignore=ignore, scheme=scheme, targets=["Linear"]
),
]
elif technique == "rtn":
return [
QuantizationModifier(
ignore=ignore, scheme=scheme, targets=["Linear"]
),
]
elif technique == "rtn_mse":
scheme_obj = preset_name_to_scheme(scheme, ["Linear"])
scheme_obj.weights.observer = "memoryless_mse"
return [
QuantizationModifier(
config_groups={"group_0": scheme_obj},
ignore=ignore,
),
]
elif technique == "gptq":
recipe = []
if "W8A8" in scheme:
recipe.append(SmoothQuantModifier(smoothing_strength=0.8))
recipe.append(
GPTQModifier(ignore=ignore, scheme=scheme, targets=["Linear"])
)
return recipe
elif technique == "imatrix":
scheme_obj = preset_name_to_scheme(scheme, ["Linear"])
scheme_obj.weights.observer = "imatrix_mse"
return [
IMatrixGatherer(ignore=ignore),
QuantizationModifier(
config_groups={"group_0": scheme_obj},
ignore=ignore,
),
]
else:
raise ValueError(f"Unknown technique: {technique}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model", required=True)
parser.add_argument(
"--technique", required=True,
choices=["awq_rtn", "rtn", "rtn_mse", "gptq", "imatrix"],
)
parser.add_argument("--scheme", required=True)
parser.add_argument("--save-dir", required=True)
parser.add_argument("--num-samples", type=int, default=256)
parser.add_argument("--max-seq-length", type=int, default=512)
args = parser.parse_args()
config = MODEL_CONFIGS.get(args.model)
if config is None:
raise ValueError(
f"Unknown model: {args.model}. "
f"Known models: {list(MODEL_CONFIGS.keys())}"
)
model = AutoModelForCausalLM.from_pretrained(args.model, dtype="auto")
tokenizer = AutoTokenizer.from_pretrained(args.model, trust_remote_code=True)
ds = load_dataset(DATASET_ID, split=f"{DATASET_SPLIT}[:{args.num_samples}]")
ds = ds.shuffle(seed=42)
def preprocess(example):
return {
"text": tokenizer.apply_chat_template(
example["messages"],
tokenize=False,
)
}
ds = ds.map(preprocess)
def tokenize(sample):
return tokenizer(
sample["text"],
padding=False,
max_length=args.max_seq_length,
truncation=True,
add_special_tokens=False,
)
ds = ds.map(tokenize, remove_columns=ds.column_names)
recipe = build_recipe(
args.technique, args.scheme, config["ignore"], config["is_moe"]
)
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
oneshot_kwargs = dict(
model=model,
dataset=ds,
recipe=recipe,
max_seq_length=args.max_seq_length,
num_calibration_samples=args.num_samples,
)
oneshot(**oneshot_kwargs)
elapsed_time = time.time() - start_time
peak_memory_gb = torch.cuda.max_memory_allocated() / (1024**3)
print("Quantization Complete")
print(f"Technique: {args.technique}, Scheme: {args.scheme}")
print(f"Time: {elapsed_time / 60:.2f} minutes ({elapsed_time:.2f} seconds)")
print(f"Peak GPU Memory: {peak_memory_gb:.2f} GB")
print("\n\n")
print("========== SAMPLE GENERATION ==============")
dispatch_model(model)
input_ids = tokenizer("Hello my name is", return_tensors="pt").input_ids.to(
model.device
)
output = model.generate(input_ids, max_new_tokens=100)
print(tokenizer.decode(output[0]))
print("==========================================\n\n")
model.save_pretrained(args.save_dir, save_compressed=True)
tokenizer.save_pretrained(args.save_dir)
print(f"Model saved to {args.save_dir}")
if __name__ == "__main__":
main()
#!/bin/bash
# Observer Refactoring Regression Test Suite
# Tests all combinations of models x techniques x schemes x branches.
# For each combo, checks out main and the PR branch, reinstalls, quantizes, and evals.
#
# Usage:
# ./run_all_tests.sh 2>&1 | tee regression_results.log
# python extract_log_summary.py regression_results.log
set -o pipefail
# Avoid permission errors on shared HF cache files
export HF_DATASETS_CACHE="/tmp/hf_datasets_cache"
mkdir -p "$HF_DATASETS_CACHE"
# ── Configuration ────────────────────────────────────────────────────────────
REPO_DIR="$(cd "$(dirname "$0")/.." && pwd)"
MODELS=("meta-llama/Meta-Llama-3-8B-Instruct" "Qwen/Qwen3-30B-A3B")
MODEL_SHORT_NAMES=("Meta-Llama-3-8B-Instruct" "Qwen3-30B-A3B")
# max_model_len,tensor_parallel_size,num_gpus_quant
MODEL_VLLM_ARGS=("2048,1,1" "2048,2,1")
TECHNIQUES=("awq_rtn" "rtn" "rtn_mse" "gptq" "imatrix")
SCHEMES=("W4A16" "W8A8")
BRANCHES=("main" "90_refactor_obs")
EVAL_NAMES=("gsm8k_platinum" "wikitext" "mmlu")
EVAL_LM_TASKS=("gsm8k_platinum" "wikitext" "mmlu")
EVAL_FEWSHOT=("5" "0" "5")
EVAL_BACKENDS=("vllm" "vllm" "vllm")
EVAL_BASE_DIR="./eval_results"
MODEL_BASE_DIR="./regression_models"
RESULTS_CSV="regression_results.csv"
ORIGINAL_BRANCH=$(git -C "$REPO_DIR" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "unknown")
mkdir -p "$EVAL_BASE_DIR" "$MODEL_BASE_DIR"
# ── Helper: activate environments ────────────────────────────────────────────
activate_quant_env() {
source /home/HDCharles/rhdev/bin/activate
}
activate_eval_env() {
source /home/HDCharles/vllm/bin/activate
}
# ── Helper: checkout branch and reinstall ────────────────────────────────────
switch_branch() {
local branch=$1
echo " Switching to branch: $branch"
git -C "$REPO_DIR" checkout "$branch" 2>&1
if [ $? -ne 0 ]; then
echo " ERROR: git checkout $branch failed"
return 1
fi
activate_quant_env
pip install -e "$REPO_DIR" 2>&1 | tail -1
echo " Installed llm-compressor from branch $branch"
}
# ── Helper: run vLLM evaluation with fallback chain ──────────────────────────
EVAL_BACKEND=""
run_vllm_eval() {
local save_dir=$1
local task=$2
local num_fewshot=$3
local max_model_len=$4
local tp_size=$5
local eval_output_dir=$6
mkdir -p "$eval_output_dir"
EVAL_BACKEND="FAILED"
activate_eval_env
echo " EVAL: $task (fewshot=$num_fewshot, tp=$tp_size, max_len=$max_model_len)"
local chat_args="--apply_chat_template"
if [ "$num_fewshot" -gt 0 ]; then
chat_args="$chat_args --fewshot_as_multiturn"
fi
if [ "$tp_size" -gt 1 ]; then
lm_eval \
--model vllm \
--model_args "pretrained=$save_dir,dtype=auto,max_model_len=$max_model_len,add_bos_token=True,tensor_parallel_size=$tp_size,gpu_memory_utilization=0.85" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="vllm_tp${tp_size}"; return 0; fi
echo " TP=$tp_size failed, trying expert_parallel..."
lm_eval \
--model vllm \
--model_args "pretrained=$save_dir,dtype=auto,max_model_len=$max_model_len,add_bos_token=True,enable_expert_parallel=True,gpu_memory_utilization=0.85" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="vllm_expert_parallel"; return 0; fi
fi
echo " Trying TP=1..."
lm_eval \
--model vllm \
--model_args "pretrained=$save_dir,dtype=auto,max_model_len=$max_model_len,add_bos_token=True,gpu_memory_utilization=0.85" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="vllm_tp1"; return 0; fi
echo " Trying enforce_eager..."
lm_eval \
--model vllm \
--model_args "pretrained=$save_dir,dtype=auto,max_model_len=$max_model_len,add_bos_token=True,enforce_eager=True,gpu_memory_utilization=0.85" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="vllm_eager"; return 0; fi
echo " Trying hf backend as last resort..."
lm_eval \
--model hf \
--model_args "pretrained=$save_dir,dtype=auto,add_bos_token=True" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="hf"; return 0; fi
EVAL_BACKEND="FAILED"
return 1
}
# ── Helper: run HF-only evaluation ─────────────────────────────────────────
run_hf_eval() {
local save_dir=$1
local task=$2
local num_fewshot=$3
local eval_output_dir=$4
mkdir -p "$eval_output_dir"
EVAL_BACKEND="FAILED"
activate_eval_env
echo " EVAL: $task (fewshot=$num_fewshot, backend=hf)"
local chat_args="--apply_chat_template"
if [ "$num_fewshot" -gt 0 ]; then
chat_args="$chat_args --fewshot_as_multiturn"
fi
lm_eval \
--model hf \
--model_args "pretrained=$save_dir,dtype=auto,add_bos_token=True" \
--tasks "$task" \
--num_fewshot "$num_fewshot" \
--batch_size auto \
$chat_args \
--output_path "$eval_output_dir" 2>&1
if [ $? -eq 0 ]; then EVAL_BACKEND="hf"; return 0; fi
EVAL_BACKEND="FAILED"
return 1
}
# ── Helper: extract metric from lm_eval JSON results ────────────────────────
extract_metric() {
local eval_output_dir=$1
local task=$2
local results_json
results_json=$(find "$eval_output_dir" -name "results_*.json" -type f 2>/dev/null | sort | tail -1)
if [ -z "$results_json" ]; then
echo "N/A"
return
fi
python3 -c "
import json, sys
with open('$results_json') as f:
data = json.load(f)
results = data.get('results', {})
task = '$task'
task_results = None
for key in results:
if task in key:
task_results = results[key]
break
if task_results is None:
print('N/A')
sys.exit()
if 'gsm8k' in task:
val = task_results.get('exact_match,strict-match')
if val is not None:
print(f'{val*100:.2f}%')
else:
print('N/A')
elif 'wikitext' in task:
val = task_results.get('word_perplexity,none')
if val is not None:
print(f'{val:.2f}')
else:
print('N/A')
elif 'mmlu' in task:
val = task_results.get('acc,none')
if val is not None:
print(f'{val*100:.2f}%')
else:
print('N/A')
else:
for k, v in task_results.items():
if 'stderr' not in k and k != 'alias' and isinstance(v, (int, float)):
print(f'{v:.4f}')
sys.exit()
print('N/A')
" 2>/dev/null || echo "N/A"
}
# ── Helper: print current results summary ────────────────────────────────────
print_summary() {
echo ""
echo "╔══════════════════════════════════════════════════════════════════════════════════════════════════════╗"
echo "║ RESULTS SUMMARY (so far) ║"
echo "╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝"
echo ""
if [ -f "$RESULTS_CSV" ]; then
column -t -s',' < "$RESULTS_CSV"
else
echo "(no results yet)"
fi
echo ""
echo "════════════════════════════════════════════════════════════════════════════════════════════════════════"
echo ""
}
# ── Helper: print branch comparison table ─────────────────────────────────
print_comparison() {
if [ ! -f "$RESULTS_CSV" ]; then
return
fi
python3 - "$RESULTS_CSV" <<'PYEOF'
import csv, sys
csv_path = sys.argv[1]
rows = []
with open(csv_path) as f:
reader = csv.DictReader(f)
for r in reader:
rows.append(r)
if not rows:
sys.exit()
# Build lookup: (model, scheme, technique, task) -> {branch: metric}
lookup = {}
for r in rows:
key = (r["model"], r["scheme"], r["technique"], r["task"])
lookup.setdefault(key, {})
lookup[key][r["branch"]] = r["metric"]
entries = [(k, v) for k, v in lookup.items()
if "main" in v and any(b != "main" for b in v)]
if not entries:
sys.exit()
pr_branch = [b for b in next(iter(lookup.values())) if b != "main"]
pr_branch = pr_branch[0] if pr_branch else "pr"
def parse_metric(s):
s = s.strip()
if s.endswith("%"):
return float(s[:-1]), True
try:
return float(s), False
except ValueError:
return None, False
def calc_change(main_str, pr_str, task):
m_val, _ = parse_metric(main_str)
p_val, _ = parse_metric(pr_str)
if m_val is None or p_val is None or m_val == 0:
return "N/A"
if "wikitext" in task:
pct = (m_val - p_val) / m_val * 100
else:
pct = (p_val - m_val) / m_val * 100
sign = "+" if pct >= 0 else ""
return f"{sign}{pct:.2f}%"
print("")
print("╔════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗")
print(f"║ BRANCH COMPARISON (main vs {pr_branch})")
print("╚════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝")
print("")
header = (f"{'model':<30} {'scheme':<10} {'technique':<12} {'task':<18} "
f"{'main':>14} {'PR':>14} {'change':>12}")
print(header)
print("-" * len(header))
for (model, scheme, technique, task), metrics in sorted(entries):
m = metrics.get("main", "")
p = metrics.get(pr_branch, "")
change = calc_change(m, p, task) if m and p else ""
print(f"{model:<30} {scheme:<10} {technique:<12} {task:<18} "
f"{m:>14} {p:>14} {change:>12}")
print("")
PYEOF
}
# ── Initialize results CSV ──────────────────────────────────────────────────
if [ -f "$RESULTS_CSV" ]; then
cp "$RESULTS_CSV" "${RESULTS_CSV}.bak"
fi
echo "model,scheme,technique,branch,task,metric,status,eval_backend,save_dir" > "$RESULTS_CSV"
# ── Main loop ────────────────────────────────────────────────────────────────
TOTAL=0
PASSED=0
FAILED=0
for model_idx in "${!MODELS[@]}"; do
model="${MODELS[$model_idx]}"
model_short="${MODEL_SHORT_NAMES[$model_idx]}"
IFS=',' read -r max_model_len tp_size num_gpus_quant <<< "${MODEL_VLLM_ARGS[$model_idx]}"
for technique in "${TECHNIQUES[@]}"; do
for scheme in "${SCHEMES[@]}"; do
for branch in "${BRANCHES[@]}"; do
save_dir="$MODEL_BASE_DIR/${model_short}-${scheme}-${technique}-${branch}"
echo ""
echo "╔══════════════════════════════════════════════════════════════════════════════════════════╗"
echo "║ MODEL: $model_short"
echo "║ SCHEME: $scheme"
echo "║ TECHNIQUE: $technique"
echo "║ BRANCH: $branch"
echo "╚══════════════════════════════════════════════════════════════════════════════════════════╝"
echo ""
# ── Skip entirely if all evals already have results ────
all_evals_cached=true
for eval_idx in "${!EVAL_NAMES[@]}"; do
eval_dir="$EVAL_BASE_DIR/${model_short}-${scheme}-${technique}-${branch}/${EVAL_NAMES[$eval_idx]}"
if ! find "$eval_dir" -name "results_*.json" -type f 2>/dev/null | grep -q .; then
all_evals_cached=false
break
fi
done
if [ "$all_evals_cached" = true ]; then
echo "All evals already cached, skipping quantization and eval."
for eval_idx in "${!EVAL_NAMES[@]}"; do
eval_name="${EVAL_NAMES[$eval_idx]}"
lm_task="${EVAL_LM_TASKS[$eval_idx]}"
eval_dir="$EVAL_BASE_DIR/${model_short}-${scheme}-${technique}-${branch}/${eval_name}"
metric_val=$(extract_metric "$eval_dir" "$lm_task")
echo " $eval_name: $metric_val"
echo "$model_short,$scheme,$technique,$branch,$eval_name,$metric_val,PASSED,cached,$save_dir" >> "$RESULTS_CSV"
PASSED=$((PASSED + 1))
TOTAL=$((TOTAL + 1))
done
print_summary
print_comparison
continue
fi
# ── Switch branch and reinstall ───────────────────────
switch_branch "$branch"
if [ $? -ne 0 ]; then
echo "BRANCH SWITCH FAILED for $branch"
for eval_name in "${EVAL_NAMES[@]}"; do
echo "$model_short,$scheme,$technique,$branch,$eval_name,N/A,BRANCH_FAILED,N/A,$save_dir" >> "$RESULTS_CSV"
done
FAILED=$((FAILED + ${#EVAL_NAMES[@]}))
TOTAL=$((TOTAL + ${#EVAL_NAMES[@]}))
print_summary
continue
fi
# ── Quantize (skip if model already exists) ────────────
if [ -d "$save_dir" ] && [ -f "$save_dir/config.json" ]; then
echo "Quantized model already exists at $save_dir, skipping quantization."
else
activate_quant_env
echo "============================================"
echo "Running: quantize.py (model=$model_short, technique=$technique, scheme=$scheme, branch=$branch)"
echo "============================================"
if [ "$num_gpus_quant" -gt 1 ]; then
torchrun --nproc_per_node="$num_gpus_quant" "$REPO_DIR/testing/quantize.py" \
--model "$model" --technique "$technique" --scheme "$scheme" \
--save-dir "$save_dir" 2>&1
else
python "$REPO_DIR/testing/quantize.py" \
--model "$model" --technique "$technique" --scheme "$scheme" \
--save-dir "$save_dir" 2>&1
fi
quant_status=$?
if [ $quant_status -ne 0 ]; then
echo "QUANTIZATION FAILED for $model_short / $scheme / $technique / $branch"
for eval_name in "${EVAL_NAMES[@]}"; do
echo "$model_short,$scheme,$technique,$branch,$eval_name,N/A,QUANT_FAILED,N/A,$save_dir" >> "$RESULTS_CSV"
done
FAILED=$((FAILED + ${#EVAL_NAMES[@]}))
TOTAL=$((TOTAL + ${#EVAL_NAMES[@]}))
print_summary
print_comparison
continue
fi
fi
# ── Clear GPU memory before eval ─────────────────────────
python3 -c "import torch; torch.cuda.empty_cache(); [torch.cuda.reset_peak_memory_stats(i) for i in range(torch.cuda.device_count())]" 2>/dev/null
# ── Evaluate ─────────────────────────────────────────────
for eval_idx in "${!EVAL_NAMES[@]}"; do
eval_name="${EVAL_NAMES[$eval_idx]}"
lm_task="${EVAL_LM_TASKS[$eval_idx]}"
fewshot="${EVAL_FEWSHOT[$eval_idx]}"
backend="${EVAL_BACKENDS[$eval_idx]}"
eval_dir="$EVAL_BASE_DIR/${model_short}-${scheme}-${technique}-${branch}/${eval_name}"
TOTAL=$((TOTAL + 1))
# Skip eval if results already exist
existing_result=$(find "$eval_dir" -name "results_*.json" -type f 2>/dev/null | sort | tail -1)
if [ -n "$existing_result" ]; then
metric_val=$(extract_metric "$eval_dir" "$lm_task")
echo " EVAL: $eval_name — skipping, previous result found: $metric_val"
echo "$model_short,$scheme,$technique,$branch,$eval_name,$metric_val,PASSED,cached,$save_dir" >> "$RESULTS_CSV"
PASSED=$((PASSED + 1))
continue
fi
if [ "$backend" == "hf" ]; then
run_hf_eval "$save_dir" "$lm_task" "$fewshot" "$eval_dir"
else
run_vllm_eval "$save_dir" "$lm_task" "$fewshot" "$max_model_len" "$tp_size" "$eval_dir"
fi
eval_status=$?
if [ $eval_status -eq 0 ]; then
metric_val=$(extract_metric "$eval_dir" "$lm_task")
echo "$model_short,$scheme,$technique,$branch,$eval_name,$metric_val,PASSED,$EVAL_BACKEND,$save_dir" >> "$RESULTS_CSV"
PASSED=$((PASSED + 1))
else
echo "$model_short,$scheme,$technique,$branch,$eval_name,N/A,FAILED,$EVAL_BACKEND,$save_dir" >> "$RESULTS_CSV"
FAILED=$((FAILED + 1))
fi
done
# ── Clean up model to free disk space ────────────────────
if [ -d "$save_dir" ]; then
echo "Removing quantized model at $save_dir to free disk space."
rm -rf "$save_dir"
fi
print_summary
print_comparison
done # branch
done # scheme
done # technique
done # model
# ── Restore original branch ─────────────────────────────────────────────────
echo "Restoring original branch: $ORIGINAL_BRANCH"
git -C "$REPO_DIR" checkout "$ORIGINAL_BRANCH" 2>&1
activate_quant_env
pip install -e "$REPO_DIR" 2>&1 | tail -1
# ── Final Summary ────────────────────────────────────────────────────────────
echo ""
echo "╔══════════════════════════════════════════════════════════════════════════════════════════╗"
echo "║ FINAL SUMMARY: $PASSED passed, $FAILED failed out of $TOTAL total evaluations ║"
echo "╚══════════════════════════════════════════════════════════════════════════════════════════╝"
echo ""
print_summary
print_comparison
echo "Results CSV: $RESULTS_CSV"
echo "Eval outputs: $EVAL_BASE_DIR/"
"""
Standalone test for NVFP4 and NVFP4A16 quantization schemes.
NVFP4 = W4A4 (weights + activations quantized to FP4, requires calibration data)
NVFP4A16 = W4A16 (weights FP4, activations FP16, data-free)
Both require Blackwell GPUs (compute capability 10.0+).
Usage:
python testing/test_nvfp4.py --scheme NVFP4
python testing/test_nvfp4.py --scheme NVFP4A16
python testing/test_nvfp4.py --scheme all
"""
import argparse
import shutil
import time
import torch
from compressed_tensors.offload import dispatch_model
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
from llmcompressor import oneshot
from llmcompressor.modifiers.quantization import QuantizationModifier
MODEL_ID = "meta-llama/Meta-Llama-3-8B-Instruct"
DATASET_ID = "HuggingFaceH4/ultrachat_200k"
DATASET_SPLIT = "train_sft"
NUM_CALIBRATION_SAMPLES = 256
MAX_SEQUENCE_LENGTH = 512
IGNORE = ["lm_head"]
def load_model():
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, dtype="auto")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
return model, tokenizer
def load_calibration_data(tokenizer):
ds = load_dataset(DATASET_ID, split=f"{DATASET_SPLIT}[:{NUM_CALIBRATION_SAMPLES}]")
ds = ds.shuffle(seed=42)
def preprocess(example):
return {
"text": tokenizer.apply_chat_template(
example["messages"], tokenize=False
)
}
ds = ds.map(preprocess)
def tokenize(sample):
return tokenizer(
sample["text"],
padding=False,
max_length=MAX_SEQUENCE_LENGTH,
truncation=True,
add_special_tokens=False,
)
ds = ds.map(tokenize, remove_columns=ds.column_names)
return ds
def sample_generate(model, tokenizer):
dispatch_model(model)
input_ids = tokenizer("Hello my name is", return_tensors="pt").input_ids.to(
model.device
)
output = model.generate(input_ids, max_new_tokens=100)
print(tokenizer.decode(output[0]))
def run_nvfp4(args):
print("=" * 60)
print("NVFP4 (W4A4) — weights + activations quantized to FP4")
print("=" * 60)
model, tokenizer = load_model()
ds = load_calibration_data(tokenizer)
recipe = QuantizationModifier(
targets="Linear", scheme="NVFP4", ignore=IGNORE
)
torch.cuda.reset_peak_memory_stats()
start = time.time()
oneshot(
model=model,
dataset=ds,
recipe=recipe,
max_seq_length=MAX_SEQUENCE_LENGTH,
num_calibration_samples=NUM_CALIBRATION_SAMPLES,
)
elapsed = time.time() - start
peak_gb = torch.cuda.max_memory_allocated() / (1024**3)
print(f"Time: {elapsed / 60:.2f} min | Peak GPU: {peak_gb:.2f} GB")
print("\n--- Sample Generation ---")
sample_generate(model, tokenizer)
save_dir = args.save_dir or "nvfp4_test_model"
model.save_pretrained(save_dir, save_compressed=True)
tokenizer.save_pretrained(save_dir)
print(f"Saved to {save_dir}")
if args.cleanup:
shutil.rmtree(save_dir)
print(f"Cleaned up {save_dir}")
del model
torch.cuda.empty_cache()
def run_nvfp4a16(args):
print("=" * 60)
print("NVFP4A16 (W4A16) — weights FP4, activations FP16 (data-free)")
print("=" * 60)
model, tokenizer = load_model()
recipe = QuantizationModifier(
targets="Linear", scheme="NVFP4A16", ignore=IGNORE
)
torch.cuda.reset_peak_memory_stats()
start = time.time()
oneshot(model=model, recipe=recipe)
elapsed = time.time() - start
peak_gb = torch.cuda.max_memory_allocated() / (1024**3)
print(f"Time: {elapsed / 60:.2f} min | Peak GPU: {peak_gb:.2f} GB")
print("\n--- Sample Generation ---")
sample_generate(model, tokenizer)
save_dir = args.save_dir or "nvfp4a16_test_model"
model.save_pretrained(save_dir, save_compressed=True)
tokenizer.save_pretrained(save_dir)
print(f"Saved to {save_dir}")
if args.cleanup:
shutil.rmtree(save_dir)
print(f"Cleaned up {save_dir}")
del model
torch.cuda.empty_cache()
def main():
parser = argparse.ArgumentParser(description="Test NVFP4 / NVFP4A16 quantization")
parser.add_argument(
"--scheme",
required=True,
choices=["NVFP4", "NVFP4A16", "all"],
help="Which scheme to test",
)
parser.add_argument("--save-dir", default=None, help="Override save directory")
parser.add_argument(
"--cleanup",
action="store_true",
help="Delete saved model after test (saves disk space)",
)
args = parser.parse_args()
if args.scheme in ("NVFP4", "all"):
run_nvfp4(args)
if args.scheme in ("NVFP4A16", "all"):
run_nvfp4a16(args)
print("\nAll requested NVFP4 tests passed.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment