Skip to content

Instantly share code, notes, and snippets.

@duarteocarmo
Last active February 18, 2026 18:07
Show Gist options
  • Select an option

  • Save duarteocarmo/b7d82c1862eadfe5f874d52e68fddf39 to your computer and use it in GitHub Desktop.

Select an option

Save duarteocarmo/b7d82c1862eadfe5f874d52e68fddf39 to your computer and use it in GitHub Desktop.
Score fineweb2-bagaco with educational quality (0-5) using e5-small + MLP classifier
# /// script
# dependencies = [
# "sentence-transformers",
# "torch",
# "scikit-learn",
# "numpy<2",
# "polars[pyarrow]",
# "pyarrow",
# "huggingface_hub",
# ]
# requires-python = ">=3.10"
# ///
"""
Score all documents in duarteocarmo/fineweb2-bagaco with educational quality (0-5)
using intfloat/multilingual-e5-small embeddings + MLP classifier.
Usage (from repo root):
uv run scripts/classify_edu_bagaco.py
Requires HF_TOKEN environment variable for upload.
Resumes automatically if interrupted (skips already-scored shards).
"""
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from types import SimpleNamespace
import numpy
import polars
import pyarrow.parquet as pq
from huggingface_hub import HfApi, hf_hub_download
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
config = SimpleNamespace(
repo_id="duarteocarmo/fineweb2-bagaco",
sample_url="https://huggingface.co/datasets/duarteocarmo/fineweb2-bagaco/resolve/main/reference/fineweb2_edu_classification_sample.parquet",
model_name="intfloat/multilingual-e5-small",
output_dir="./edu_classified_output",
download_dir="./shard_cache",
max_chars=800,
encode_batch_size=2048,
download_workers=8,
row_group_size=1024,
random_state=42,
)
def get_device() -> str:
import torch
if torch.cuda.is_available():
return "cuda"
if torch.backends.mps.is_available():
return "mps"
return "cpu"
def load_model(device: str):
from sentence_transformers import SentenceTransformer
model_kwargs = {}
if device in ("cuda", "mps"):
model_kwargs["torch_dtype"] = "float16"
model = SentenceTransformer(
config.model_name, device=device, model_kwargs=model_kwargs
)
print(f"Loaded {config.model_name} on {device} (fp16={device in ('cuda', 'mps')})")
return model
def encode(model, texts: list[str]) -> numpy.ndarray:
truncated = [t[: config.max_chars].strip() for t in texts]
return model.encode(
truncated, batch_size=config.encode_batch_size, show_progress_bar=True
)
# ─── Step 1: Download + parse training sample ───
def download_sample() -> tuple[list[str], list[int]]:
df = polars.read_parquet(config.sample_url)
n_before = df.height
df = df.filter(polars.col("edu_classification").is_not_null())
if n_before - df.height > 0:
print(f"Dropped {n_before - df.height} rows with null edu_classification")
texts = df["text"].to_list()
scores = [
max(0, min(5, int(json.loads(row)["educational_score"])))
for row in df["edu_classification"].to_list()
]
print(f"Loaded {len(texts)} labeled samples")
return texts, scores
# ─── Step 2: Train classifier (with validation) ───
def train_classifier(model, texts: list[str], scores: list[int]) -> MLPClassifier:
train_texts, test_texts, train_scores, test_scores = train_test_split(
texts,
scores,
test_size=0.20,
random_state=config.random_state,
stratify=scores,
)
print(f"\n--- Validation (train={len(train_texts)}, test={len(test_texts)}) ---")
train_emb = encode(model=model, texts=train_texts)
val_classifier = MLPClassifier(
hidden_layer_sizes=(1024, 256),
max_iter=500,
early_stopping=True,
random_state=config.random_state,
)
val_classifier.fit(X=train_emb, y=train_scores)
test_emb = encode(model=model, texts=test_texts)
predictions = val_classifier.predict(X=test_emb)
print(
classification_report(y_true=test_scores, y_pred=predictions, zero_division=0)
)
# Retrain on full sample for production
print(f"--- Retraining on full sample ({len(texts)} rows) ---")
all_emb = encode(model=model, texts=texts)
classifier = MLPClassifier(
hidden_layer_sizes=(1024, 256),
max_iter=500,
early_stopping=True,
random_state=config.random_state,
)
classifier.fit(X=all_emb, y=scores)
print(f"Classifier trained on {len(texts)} samples")
return classifier
# ─── Step 3: Pre-download all shards in parallel ───
def list_shards() -> list[str]:
api = HfApi()
files = api.list_repo_files(repo_id=config.repo_id, repo_type="dataset")
shards = sorted(
[f for f in files if f.startswith("shard_") and f.endswith(".parquet")]
)
print(f"Found {len(shards)} shards in {config.repo_id}")
return shards
def download_shard(shard_filename: str) -> tuple[str, str]:
local_path = hf_hub_download(
repo_id=config.repo_id,
filename=shard_filename,
repo_type="dataset",
cache_dir=config.download_dir,
)
return shard_filename, local_path
def download_all_shards(shard_filenames: list[str]) -> dict[str, str]:
print(
f"\nPre-downloading {len(shard_filenames)} shards with {config.download_workers} workers..."
)
shard_paths = {}
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=config.download_workers) as executor:
futures = {executor.submit(download_shard, f): f for f in shard_filenames}
for future in as_completed(futures):
filename, local_path = future.result()
shard_paths[filename] = local_path
done = len(shard_paths)
if done % 10 == 0 or done == len(shard_filenames):
elapsed = time.perf_counter() - t0
print(
f" Downloaded {done}/{len(shard_filenames)} shards ({elapsed:.1f}s)"
)
total = time.perf_counter() - t0
print(f"All shards downloaded in {total:.1f}s")
return shard_paths
# ─── Step 4: Score shard-by-shard ───
def score_shard(
model, classifier: MLPClassifier, local_path: str, output_path: Path
) -> int:
df = polars.read_parquet(local_path)
texts = df["text"].to_list()
embeddings = encode(model=model, texts=texts)
edu_scores = classifier.predict(X=embeddings)
df = df.with_columns(
polars.Series(
name="educational_score", values=edu_scores.tolist(), dtype=polars.Int8
)
)
table = df.to_arrow()
pq.write_table(
table,
str(output_path),
row_group_size=config.row_group_size,
use_dictionary=False,
compression="zstd",
compression_level=3,
write_statistics=False,
)
return len(df)
def score_all(model, classifier: MLPClassifier):
shards = list_shards()
os.makedirs(config.output_dir, exist_ok=True)
to_score = []
skipped_rows = 0
for shard_filename in shards:
output_path = Path(config.output_dir) / shard_filename
if output_path.exists() and output_path.stat().st_size > 0:
existing = polars.read_parquet(str(output_path))
if "educational_score" in existing.columns:
skipped_rows += len(existing)
print(
f" SKIP {shard_filename} (already scored, {len(existing):,} rows)"
)
continue
to_score.append(shard_filename)
if skipped_rows > 0:
print(
f"Resuming: {skipped_rows:,} rows already scored, {len(to_score)} shards remaining"
)
if not to_score:
print("All shards already scored!")
return
shard_paths = download_all_shards(shard_filenames=to_score)
total_rows = skipped_rows
total_time = 0.0
for i, shard_filename in enumerate(to_score):
output_path = Path(config.output_dir) / shard_filename
local_path = shard_paths[shard_filename]
t0 = time.perf_counter()
n_rows = score_shard(
model=model,
classifier=classifier,
local_path=local_path,
output_path=output_path,
)
dt = time.perf_counter() - t0
total_rows += n_rows
total_time += dt
throughput = n_rows / dt if dt > 0 else 0
avg_throughput = (
(total_rows - skipped_rows) / total_time if total_time > 0 else 0
)
print(
f"[{i + 1}/{len(to_score)}] {shard_filename}: {n_rows:,} rows in {dt:.1f}s "
f"({throughput:,.0f} samp/s) | total: {total_rows:,} rows, avg {avg_throughput:,.0f} samp/s"
)
print(f"\nScoring complete: {total_rows:,} total rows in {total_time:.1f}s")
# ─── Step 5: Upload ───
def upload():
token = os.getenv("HF_TOKEN")
assert token is not None, "HF_TOKEN environment variable not set."
api = HfApi(token=token)
api.upload_large_folder(
folder_path=config.output_dir,
repo_id=config.repo_id,
repo_type="dataset",
)
print(f"Uploaded to {config.repo_id}")
def main():
device = get_device()
print(f"Device: {device}")
texts, scores = download_sample()
model = load_model(device=device)
classifier = train_classifier(model=model, texts=texts, scores=scores)
score_all(model=model, classifier=classifier)
upload()
if __name__ == "__main__":
main()
os._exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment