Last active
February 18, 2026 18:07
-
-
Save duarteocarmo/b7d82c1862eadfe5f874d52e68fddf39 to your computer and use it in GitHub Desktop.
Score fineweb2-bagaco with educational quality (0-5) using e5-small + MLP classifier
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "sentence-transformers", | |
| # "torch", | |
| # "scikit-learn", | |
| # "numpy<2", | |
| # "polars[pyarrow]", | |
| # "pyarrow", | |
| # "huggingface_hub", | |
| # ] | |
| # requires-python = ">=3.10" | |
| # /// | |
| """ | |
| Score all documents in duarteocarmo/fineweb2-bagaco with educational quality (0-5) | |
| using intfloat/multilingual-e5-small embeddings + MLP classifier. | |
| Usage (from repo root): | |
| uv run scripts/classify_edu_bagaco.py | |
| Requires HF_TOKEN environment variable for upload. | |
| Resumes automatically if interrupted (skips already-scored shards). | |
| """ | |
| import json | |
| import os | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from types import SimpleNamespace | |
| import numpy | |
| import polars | |
| import pyarrow.parquet as pq | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from sklearn.metrics import classification_report | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.neural_network import MLPClassifier | |
| config = SimpleNamespace( | |
| repo_id="duarteocarmo/fineweb2-bagaco", | |
| sample_url="https://huggingface.co/datasets/duarteocarmo/fineweb2-bagaco/resolve/main/reference/fineweb2_edu_classification_sample.parquet", | |
| model_name="intfloat/multilingual-e5-small", | |
| output_dir="./edu_classified_output", | |
| download_dir="./shard_cache", | |
| max_chars=800, | |
| encode_batch_size=2048, | |
| download_workers=8, | |
| row_group_size=1024, | |
| random_state=42, | |
| ) | |
| def get_device() -> str: | |
| import torch | |
| if torch.cuda.is_available(): | |
| return "cuda" | |
| if torch.backends.mps.is_available(): | |
| return "mps" | |
| return "cpu" | |
| def load_model(device: str): | |
| from sentence_transformers import SentenceTransformer | |
| model_kwargs = {} | |
| if device in ("cuda", "mps"): | |
| model_kwargs["torch_dtype"] = "float16" | |
| model = SentenceTransformer( | |
| config.model_name, device=device, model_kwargs=model_kwargs | |
| ) | |
| print(f"Loaded {config.model_name} on {device} (fp16={device in ('cuda', 'mps')})") | |
| return model | |
| def encode(model, texts: list[str]) -> numpy.ndarray: | |
| truncated = [t[: config.max_chars].strip() for t in texts] | |
| return model.encode( | |
| truncated, batch_size=config.encode_batch_size, show_progress_bar=True | |
| ) | |
| # ─── Step 1: Download + parse training sample ─── | |
| def download_sample() -> tuple[list[str], list[int]]: | |
| df = polars.read_parquet(config.sample_url) | |
| n_before = df.height | |
| df = df.filter(polars.col("edu_classification").is_not_null()) | |
| if n_before - df.height > 0: | |
| print(f"Dropped {n_before - df.height} rows with null edu_classification") | |
| texts = df["text"].to_list() | |
| scores = [ | |
| max(0, min(5, int(json.loads(row)["educational_score"]))) | |
| for row in df["edu_classification"].to_list() | |
| ] | |
| print(f"Loaded {len(texts)} labeled samples") | |
| return texts, scores | |
| # ─── Step 2: Train classifier (with validation) ─── | |
| def train_classifier(model, texts: list[str], scores: list[int]) -> MLPClassifier: | |
| train_texts, test_texts, train_scores, test_scores = train_test_split( | |
| texts, | |
| scores, | |
| test_size=0.20, | |
| random_state=config.random_state, | |
| stratify=scores, | |
| ) | |
| print(f"\n--- Validation (train={len(train_texts)}, test={len(test_texts)}) ---") | |
| train_emb = encode(model=model, texts=train_texts) | |
| val_classifier = MLPClassifier( | |
| hidden_layer_sizes=(1024, 256), | |
| max_iter=500, | |
| early_stopping=True, | |
| random_state=config.random_state, | |
| ) | |
| val_classifier.fit(X=train_emb, y=train_scores) | |
| test_emb = encode(model=model, texts=test_texts) | |
| predictions = val_classifier.predict(X=test_emb) | |
| print( | |
| classification_report(y_true=test_scores, y_pred=predictions, zero_division=0) | |
| ) | |
| # Retrain on full sample for production | |
| print(f"--- Retraining on full sample ({len(texts)} rows) ---") | |
| all_emb = encode(model=model, texts=texts) | |
| classifier = MLPClassifier( | |
| hidden_layer_sizes=(1024, 256), | |
| max_iter=500, | |
| early_stopping=True, | |
| random_state=config.random_state, | |
| ) | |
| classifier.fit(X=all_emb, y=scores) | |
| print(f"Classifier trained on {len(texts)} samples") | |
| return classifier | |
| # ─── Step 3: Pre-download all shards in parallel ─── | |
| def list_shards() -> list[str]: | |
| api = HfApi() | |
| files = api.list_repo_files(repo_id=config.repo_id, repo_type="dataset") | |
| shards = sorted( | |
| [f for f in files if f.startswith("shard_") and f.endswith(".parquet")] | |
| ) | |
| print(f"Found {len(shards)} shards in {config.repo_id}") | |
| return shards | |
| def download_shard(shard_filename: str) -> tuple[str, str]: | |
| local_path = hf_hub_download( | |
| repo_id=config.repo_id, | |
| filename=shard_filename, | |
| repo_type="dataset", | |
| cache_dir=config.download_dir, | |
| ) | |
| return shard_filename, local_path | |
| def download_all_shards(shard_filenames: list[str]) -> dict[str, str]: | |
| print( | |
| f"\nPre-downloading {len(shard_filenames)} shards with {config.download_workers} workers..." | |
| ) | |
| shard_paths = {} | |
| t0 = time.perf_counter() | |
| with ThreadPoolExecutor(max_workers=config.download_workers) as executor: | |
| futures = {executor.submit(download_shard, f): f for f in shard_filenames} | |
| for future in as_completed(futures): | |
| filename, local_path = future.result() | |
| shard_paths[filename] = local_path | |
| done = len(shard_paths) | |
| if done % 10 == 0 or done == len(shard_filenames): | |
| elapsed = time.perf_counter() - t0 | |
| print( | |
| f" Downloaded {done}/{len(shard_filenames)} shards ({elapsed:.1f}s)" | |
| ) | |
| total = time.perf_counter() - t0 | |
| print(f"All shards downloaded in {total:.1f}s") | |
| return shard_paths | |
| # ─── Step 4: Score shard-by-shard ─── | |
| def score_shard( | |
| model, classifier: MLPClassifier, local_path: str, output_path: Path | |
| ) -> int: | |
| df = polars.read_parquet(local_path) | |
| texts = df["text"].to_list() | |
| embeddings = encode(model=model, texts=texts) | |
| edu_scores = classifier.predict(X=embeddings) | |
| df = df.with_columns( | |
| polars.Series( | |
| name="educational_score", values=edu_scores.tolist(), dtype=polars.Int8 | |
| ) | |
| ) | |
| table = df.to_arrow() | |
| pq.write_table( | |
| table, | |
| str(output_path), | |
| row_group_size=config.row_group_size, | |
| use_dictionary=False, | |
| compression="zstd", | |
| compression_level=3, | |
| write_statistics=False, | |
| ) | |
| return len(df) | |
| def score_all(model, classifier: MLPClassifier): | |
| shards = list_shards() | |
| os.makedirs(config.output_dir, exist_ok=True) | |
| to_score = [] | |
| skipped_rows = 0 | |
| for shard_filename in shards: | |
| output_path = Path(config.output_dir) / shard_filename | |
| if output_path.exists() and output_path.stat().st_size > 0: | |
| existing = polars.read_parquet(str(output_path)) | |
| if "educational_score" in existing.columns: | |
| skipped_rows += len(existing) | |
| print( | |
| f" SKIP {shard_filename} (already scored, {len(existing):,} rows)" | |
| ) | |
| continue | |
| to_score.append(shard_filename) | |
| if skipped_rows > 0: | |
| print( | |
| f"Resuming: {skipped_rows:,} rows already scored, {len(to_score)} shards remaining" | |
| ) | |
| if not to_score: | |
| print("All shards already scored!") | |
| return | |
| shard_paths = download_all_shards(shard_filenames=to_score) | |
| total_rows = skipped_rows | |
| total_time = 0.0 | |
| for i, shard_filename in enumerate(to_score): | |
| output_path = Path(config.output_dir) / shard_filename | |
| local_path = shard_paths[shard_filename] | |
| t0 = time.perf_counter() | |
| n_rows = score_shard( | |
| model=model, | |
| classifier=classifier, | |
| local_path=local_path, | |
| output_path=output_path, | |
| ) | |
| dt = time.perf_counter() - t0 | |
| total_rows += n_rows | |
| total_time += dt | |
| throughput = n_rows / dt if dt > 0 else 0 | |
| avg_throughput = ( | |
| (total_rows - skipped_rows) / total_time if total_time > 0 else 0 | |
| ) | |
| print( | |
| f"[{i + 1}/{len(to_score)}] {shard_filename}: {n_rows:,} rows in {dt:.1f}s " | |
| f"({throughput:,.0f} samp/s) | total: {total_rows:,} rows, avg {avg_throughput:,.0f} samp/s" | |
| ) | |
| print(f"\nScoring complete: {total_rows:,} total rows in {total_time:.1f}s") | |
| # ─── Step 5: Upload ─── | |
| def upload(): | |
| token = os.getenv("HF_TOKEN") | |
| assert token is not None, "HF_TOKEN environment variable not set." | |
| api = HfApi(token=token) | |
| api.upload_large_folder( | |
| folder_path=config.output_dir, | |
| repo_id=config.repo_id, | |
| repo_type="dataset", | |
| ) | |
| print(f"Uploaded to {config.repo_id}") | |
| def main(): | |
| device = get_device() | |
| print(f"Device: {device}") | |
| texts, scores = download_sample() | |
| model = load_model(device=device) | |
| classifier = train_classifier(model=model, texts=texts, scores=scores) | |
| score_all(model=model, classifier=classifier) | |
| upload() | |
| if __name__ == "__main__": | |
| main() | |
| os._exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment