Last active
March 2, 2025 18:55
-
-
Save vielhuber/a0aa20559d62cebe2e1991af1d9d15e0 to your computer and use it in GitHub Desktop.
build llm from scratch #python #ki
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tiktoken | |
import torch | |
import torch.nn as nn | |
import urllib.request | |
import re | |
from torch.utils.data import Dataset, DataLoader | |
import numpy as np | |
import zipfile | |
import os | |
from pathlib import Path | |
import pandas as pd | |
import time | |
import json | |
import tensorflow as tf | |
from tqdm import tqdm | |
class GPT2: | |
""" | |
This single class encapsulates all functions and classes from the original code. | |
""" | |
# -------------------------------------------------------------------------------- | |
# Basic GPT configuration (example: GPT-2 124M) | |
# -------------------------------------------------------------------------------- | |
GPT_CONFIG_124M = { | |
"vocab_size": 50257, | |
"context_length": 256, | |
"emb_dim": 768, | |
"n_heads": 12, | |
"n_layers": 12, | |
"drop_rate": 0.1, | |
"qkv_bias": False | |
} | |
# -------------------------------------------------------------------------------- | |
# Static Methods | |
# -------------------------------------------------------------------------------- | |
@staticmethod | |
def download_and_load_gpt2(model_size, models_dir): | |
""" | |
Downloads the requested GPT-2 model from official or backup URLs, | |
extracts all relevant files, loads TF checkpoint parameters, | |
and returns the loaded settings and params as dictionaries. | |
:param model_size: The model size string (e.g. '124M', '355M', '774M', '1558M'). | |
:param models_dir: Directory to store the downloaded model files. | |
:return: (settings, params) tuple, where 'settings' is a dict of GPT-2 settings | |
and 'params' is a dict of model parameters from the TF checkpoint. | |
""" | |
# Validate model size | |
allowed_sizes = ("124M", "355M", "774M", "1558M") | |
if model_size not in allowed_sizes: | |
raise ValueError(f"Model size not in {allowed_sizes}") | |
# Define paths | |
model_dir = os.path.join(models_dir, model_size) | |
base_url = "https://openaipublic.blob.core.windows.net/gpt-2/models" | |
backup_base_url = "https://f001.backblazeb2.com/file/LLMs-from-scratch/gpt2" | |
filenames = [ | |
"checkpoint", "encoder.json", "hparams.json", | |
"model.ckpt.data-00000-of-00001", "model.ckpt.index", | |
"model.ckpt.meta", "vocab.bpe" | |
] | |
# Download files | |
os.makedirs(model_dir, exist_ok=True) | |
for filename in filenames: | |
file_url = os.path.join(base_url, model_size, filename) | |
backup_url = os.path.join(backup_base_url, model_size, filename) | |
file_path = os.path.join(model_dir, filename) | |
GPT2.download_file(file_url, file_path, backup_url) | |
# Load settings and params | |
tf_ckpt_path = tf.train.latest_checkpoint(model_dir) | |
settings = json.load(open(os.path.join(model_dir, "hparams.json"))) | |
params = GPT2.load_gpt2_params_from_tf_ckpt(tf_ckpt_path, settings) | |
return settings, params | |
@staticmethod | |
def download_file(url, destination, backup_url=None): | |
""" | |
Downloads a file from a given URL (or a backup URL, if the primary fails). | |
Uses a progress bar and does not re-download if file is already present | |
and up-to-date. | |
:param url: Primary URL to download from. | |
:param destination: File path where the downloaded data will be saved. | |
:param backup_url: Optional backup URL if the primary fails. | |
""" | |
def _attempt_download(download_url): | |
with urllib.request.urlopen(download_url) as response: | |
# Get the total file size from headers, defaulting to 0 if not present | |
file_size = int(response.headers.get("Content-Length", 0)) | |
# Check if file exists and has the same size | |
if os.path.exists(destination): | |
file_size_local = os.path.getsize(destination) | |
if file_size == file_size_local: | |
print(f"File already exists and is up-to-date: {destination}") | |
return True # Indicate success without re-downloading | |
block_size = 1024 # 1 Kilobyte | |
# Initialize the progress bar with total file size | |
progress_bar_description = os.path.basename(download_url) | |
with tqdm(total=file_size, unit="iB", unit_scale=True, | |
desc=progress_bar_description) as progress_bar: | |
with open(destination, "wb") as file: | |
while True: | |
chunk = response.read(block_size) | |
if not chunk: | |
break | |
file.write(chunk) | |
progress_bar.update(len(chunk)) | |
return True | |
try: | |
if _attempt_download(url): | |
return | |
except (urllib.error.HTTPError, urllib.error.URLError): | |
if backup_url is not None: | |
print(f"Primary URL ({url}) failed. Attempting backup URL: {backup_url}") | |
try: | |
if _attempt_download(backup_url): | |
return | |
except urllib.error.HTTPError: | |
pass | |
# If we reach here, both attempts have failed | |
error_message = ( | |
f"Failed to download from both primary URL ({url})" | |
f"{' and backup URL (' + backup_url + ')' if backup_url else ''}." | |
"\nCheck your internet connection or the file availability.\n" | |
"For help, visit: https://github.com/rasbt/LLMs-from-scratch/discussions/273" | |
) | |
print(error_message) | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
@staticmethod | |
def load_gpt2_params_from_tf_ckpt(ckpt_path, settings): | |
""" | |
Loads GPT-2 parameters from a TensorFlow checkpoint and places them | |
into a structured dictionary. | |
:param ckpt_path: Path to the TF checkpoint (usually within the model folder). | |
:param settings: Dictionary of GPT-2 model settings (including n_layer). | |
:return: params (dict) containing the model parameters structured by blocks. | |
""" | |
# Initialize parameters dictionary with empty blocks for each layer | |
params = {"blocks": [{} for _ in range(settings["n_layer"])]} | |
# Iterate over each variable in the checkpoint | |
for name, _ in tf.train.list_variables(ckpt_path): | |
# Load the variable and remove singleton dimensions | |
variable_array = np.squeeze(tf.train.load_variable(ckpt_path, name)) | |
# Process the variable name to extract relevant parts | |
variable_name_parts = name.split("/")[1:] # Skip the 'model/' prefix | |
# Identify the target dictionary for the variable | |
target_dict = params | |
if variable_name_parts[0].startswith("h"): | |
layer_number = int(variable_name_parts[0][1:]) | |
target_dict = params["blocks"][layer_number] | |
# Recursively access or create nested dictionaries | |
for key in variable_name_parts[1:-1]: | |
target_dict = target_dict.setdefault(key, {}) | |
# Assign the variable array to the last key | |
last_key = variable_name_parts[-1] | |
target_dict[last_key] = variable_array | |
return params | |
@staticmethod | |
def train_classifier_simple(model, train_loader, val_loader, | |
optimizer, device, num_epochs, eval_freq, eval_iter): | |
""" | |
Simple training loop for a classification model, evaluating every | |
'eval_freq' steps. Returns losses, accuracies, and examples seen. | |
:param model: PyTorch model to be trained. | |
:param train_loader: DataLoader for the training dataset. | |
:param val_loader: DataLoader for the validation dataset. | |
:param optimizer: Optimizer (e.g., AdamW). | |
:param device: Device to train on (CPU or GPU). | |
:param num_epochs: Number of epochs to train. | |
:param eval_freq: Frequency (in steps) of evaluating on training/validation sets. | |
:param eval_iter: Number of mini-batches to evaluate. | |
:return: (train_losses, val_losses, train_accs, val_accs, examples_seen) | |
""" | |
train_losses, val_losses, train_accs, val_accs = [], [], [], [] | |
examples_seen, global_step = 0, -1 | |
for epoch in range(num_epochs): | |
model.train() | |
for input_batch, target_batch in train_loader: | |
optimizer.zero_grad() | |
loss = GPT2.calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
loss.backward() | |
optimizer.step() | |
examples_seen += input_batch.shape[0] | |
global_step += 1 | |
if global_step % eval_freq == 0: | |
train_loss, val_loss = GPT2.evaluate_model( | |
model, train_loader, val_loader, device, eval_iter | |
) | |
train_losses.append(train_loss) | |
val_losses.append(val_loss) | |
print(f"Ep {epoch+1} (Step {global_step:06d}): " | |
f"Train loss {train_loss:.3f}, " | |
f"Val loss {val_loss:.3f}") | |
train_accuracy = GPT2.calc_accuracy_loader( | |
train_loader, model, device, num_batches=eval_iter | |
) | |
val_accuracy = GPT2.calc_accuracy_loader( | |
val_loader, model, device, num_batches=eval_iter | |
) | |
print(f"Training accuracy: {train_accuracy*100:.2f}% | ", end="") | |
print(f"Validation accuracy: {val_accuracy*100:.2f}%") | |
train_accs.append(train_accuracy) | |
val_accs.append(val_accuracy) | |
return train_losses, val_losses, train_accs, val_accs, examples_seen | |
@staticmethod | |
def calc_accuracy_loader(data_loader, model, device, num_batches=None): | |
""" | |
Calculates accuracy on a given DataLoader for classification. | |
:param data_loader: DataLoader with (input_batch, target_batch). | |
:param model: PyTorch model for evaluation. | |
:param device: Device to evaluate on. | |
:param num_batches: Optional limit on number of batches to evaluate. | |
:return: accuracy (float) | |
""" | |
model.eval() | |
correct_predictions, num_examples = 0, 0 | |
if num_batches is None: | |
num_batches = len(data_loader) | |
else: | |
num_batches = min(num_batches, len(data_loader)) | |
for i, (input_batch, target_batch) in enumerate(data_loader): | |
if i < num_batches: | |
input_batch = input_batch.to(device) | |
target_batch = target_batch.to(device) | |
with torch.no_grad(): | |
logits = model(input_batch)[:, -1, :] | |
predicted_labels = torch.argmax(logits, dim=-1) | |
num_examples += predicted_labels.shape[0] | |
correct_predictions += (predicted_labels == target_batch).sum().item() | |
else: | |
break | |
return correct_predictions / num_examples | |
@staticmethod | |
def random_split(df, train_frac, validation_frac): | |
""" | |
Splits a DataFrame into train/validation/test sets by given fractions. | |
:param df: The DataFrame to split. | |
:param train_frac: Fraction of data for training. | |
:param validation_frac: Fraction of data for validation. | |
:return: (train_df, validation_df, test_df) DataFrames | |
""" | |
df = df.sample(frac=1, random_state=123).reset_index(drop=True) | |
train_end = int(len(df) * train_frac) | |
validation_end = train_end + int(len(df) * validation_frac) | |
train_df = df[:train_end] | |
validation_df = df[train_end:validation_end] | |
test_df = df[validation_end:] | |
return train_df, validation_df, test_df | |
@staticmethod | |
def create_balanced_dataset(df): | |
""" | |
Given a spam/ham dataset, balances it by sampling equally from each class. | |
:param df: Original DataFrame containing a 'Label' column ('spam' or 'ham'). | |
:return: balanced_df (DataFrame) with equal spam/ham instances. | |
""" | |
num_spam = df[df["Label"] == "spam"].shape[0] | |
ham_subset = df[df["Label"] == "ham"].sample(num_spam, random_state=123) | |
balanced_df = pd.concat([ham_subset, df[df["Label"] == "spam"]]) | |
return balanced_df | |
@staticmethod | |
def download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path): | |
""" | |
Downloads a zip file from 'url' and extracts the file if not already present. | |
:param url: URL for the SMS Spam Collection dataset. | |
:param zip_path: Local zip path. | |
:param extracted_path: Directory to extract to. | |
:param data_file_path: Final path of the extracted file. | |
""" | |
if data_file_path.exists(): | |
print(f"{data_file_path} already exists. Skipping download and extraction.") | |
return | |
with urllib.request.urlopen(url) as response: | |
with open(zip_path, "wb") as out_file: | |
out_file.write(response.read()) | |
with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
zip_ref.extractall(extracted_path) | |
original_file_path = Path(extracted_path) / "SMSSpamCollection" | |
os.rename(original_file_path, data_file_path) | |
print(f"File downloaded and saved as {data_file_path}") | |
@staticmethod | |
def evaluate_model(model, train_loader, val_loader, device, eval_iter): | |
""" | |
Evaluates the model on both training and validation sets (loss only). | |
:param model: PyTorch model. | |
:param train_loader: DataLoader for training set. | |
:param val_loader: DataLoader for validation set. | |
:param device: Device (CPU/GPU). | |
:param eval_iter: Number of mini-batches to evaluate. | |
:return: (train_loss, val_loss) | |
""" | |
model.eval() | |
with torch.no_grad(): | |
train_loss = GPT2.calc_loss_loader( | |
train_loader, model, device, num_batches=eval_iter | |
) | |
val_loss = GPT2.calc_loss_loader( | |
val_loader, model, device, num_batches=eval_iter | |
) | |
model.train() | |
return train_loss, val_loss | |
@staticmethod | |
def generate_and_print_sample(model, tokenizer, device, start_context): | |
""" | |
Generates a sample of text from the model given a start context, | |
then prints the decoded text. | |
:param model: PyTorch model. | |
:param tokenizer: Tokenizer (compatible with GPT-2). | |
:param device: Device (CPU/GPU). | |
:param start_context: Initial text prompt (string). | |
""" | |
model.eval() | |
context_size = model.pos_emb.weight.shape[0] | |
encoded = GPT2.text_to_token_ids(start_context, tokenizer).to(device) | |
with torch.no_grad(): | |
token_ids = GPT2.generate_text_simple( | |
model=model, | |
idx=encoded, | |
max_new_tokens=50, | |
context_size=context_size | |
) | |
decoded_text = GPT2.token_ids_to_text(token_ids, tokenizer) | |
print(decoded_text.replace("\n", " ")) | |
model.train() | |
@staticmethod | |
def train_model_simple(model, train_loader, val_loader, optimizer, | |
device, num_epochs, eval_freq, eval_iter, | |
start_context, tokenizer): | |
""" | |
Simple language modeling training loop. Evaluates on train/val sets, | |
generates sample text after each epoch. | |
:param model: PyTorch model. | |
:param train_loader: Training DataLoader. | |
:param val_loader: Validation DataLoader. | |
:param optimizer: Optimizer (e.g., AdamW). | |
:param device: Device (CPU/GPU). | |
:param num_epochs: Number of training epochs. | |
:param eval_freq: Steps between evaluations. | |
:param eval_iter: Number of mini-batches for evaluation. | |
:param start_context: String prompt to use for sample generation. | |
:param tokenizer: GPT-2-compatible tokenizer for sample generation. | |
:return: (train_losses, val_losses, track_tokens_seen) | |
""" | |
train_losses, val_losses, track_tokens_seen = [], [], [] | |
tokens_seen, global_step = 0, -1 | |
for epoch in range(num_epochs): | |
model.train() | |
for input_batch, target_batch in train_loader: | |
optimizer.zero_grad() | |
loss = GPT2.calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
loss.backward() | |
optimizer.step() | |
tokens_seen += input_batch.numel() | |
global_step += 1 | |
if global_step % eval_freq == 0: | |
train_loss, val_loss = GPT2.evaluate_model( | |
model, train_loader, val_loader, device, eval_iter | |
) | |
train_losses.append(train_loss) | |
val_losses.append(val_loss) | |
track_tokens_seen.append(tokens_seen) | |
print(f"Ep {epoch+1} (Step {global_step:06d}): " | |
f"Train loss {train_loss:.3f}, " | |
f"Val loss {val_loss:.3f}") | |
GPT2.generate_and_print_sample( | |
model, tokenizer, device, start_context | |
) | |
return train_losses, val_losses, track_tokens_seen | |
@staticmethod | |
def calc_loss_loader(data_loader, model, device, num_batches=None): | |
""" | |
Calculates average loss over the given data loader. | |
:param data_loader: DataLoader of (input, target) pairs. | |
:param model: PyTorch model. | |
:param device: Device (CPU/GPU). | |
:param num_batches: Number of batches to use for the calculation. | |
:return: Average loss (float). | |
""" | |
total_loss = 0. | |
if len(data_loader) == 0: | |
return float("nan") | |
elif num_batches is None: | |
num_batches = len(data_loader) | |
else: | |
num_batches = min(num_batches, len(data_loader)) | |
for i, (input_batch, target_batch) in enumerate(data_loader): | |
if i < num_batches: | |
loss = GPT2.calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
total_loss += loss.item() | |
else: | |
break | |
return total_loss / num_batches | |
@staticmethod | |
def calc_loss_batch(input_batch, target_batch, model, device): | |
""" | |
Calculates cross-entropy loss for a single batch (classification or last-token LM). | |
:param input_batch: Tensor of shape (batch_size, seq_len). | |
:param target_batch: Tensor of shape (batch_size,) or (batch_size, seq_len), depending on model. | |
:param model: PyTorch model. | |
:param device: Device (CPU/GPU). | |
:return: loss (scalar). | |
""" | |
input_batch = input_batch.to(device) | |
target_batch = target_batch.to(device) | |
logits = model(input_batch)[:, -1, :] # using only last token for classification | |
loss = torch.nn.functional.cross_entropy(logits, target_batch) | |
return loss | |
@staticmethod | |
def text_to_token_ids(text, tokenizer): | |
""" | |
Converts a text string into a tensor of token IDs using the given tokenizer. | |
:param text: Input text string. | |
:param tokenizer: GPT-2-compatible tokenizer. | |
:return: A torch.LongTensor (1, seq_len). | |
""" | |
encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'}) | |
encoded_tensor = torch.tensor(encoded).unsqueeze(0) | |
return encoded_tensor | |
@staticmethod | |
def token_ids_to_text(token_ids, tokenizer): | |
""" | |
Converts a tensor of token IDs back into a text string using the given tokenizer. | |
:param token_ids: Tensor of shape (1, seq_len). | |
:param tokenizer: GPT-2-compatible tokenizer. | |
:return: Decoded text string. | |
""" | |
flat = token_ids.squeeze(0) | |
return tokenizer.decode(flat.tolist()) | |
@staticmethod | |
def generate(model, idx, max_new_tokens, context_size, temperature=0.0, | |
top_k=None, eos_id=None): | |
""" | |
Generates text token by token, optionally using temperature sampling | |
and top-k filtering. | |
:param model: PyTorch model. | |
:param idx: Tensor of shape (1, context_so_far_length). | |
:param max_new_tokens: Number of tokens to generate. | |
:param context_size: Maximum context size of the model. | |
:param temperature: Temperature parameter for sampling (0.0 = greedy). | |
:param top_k: Keep only top K tokens for sampling (optional). | |
:param eos_id: Optional EOS token id to stop generation prematurely. | |
:return: Updated idx tensor with new tokens appended. | |
""" | |
for _ in range(max_new_tokens): | |
idx_cond = idx[:, -context_size:] | |
with torch.no_grad(): | |
logits = model(idx_cond) | |
logits = logits[:, -1, :] | |
if top_k is not None: | |
top_logits, _ = torch.topk(logits, top_k) | |
min_val = top_logits[:, -1] | |
logits = torch.where( | |
logits < min_val, | |
torch.tensor(float('-inf')).to(logits.device), | |
logits | |
) | |
if temperature > 0.0: | |
logits = logits / temperature | |
probs = torch.softmax(logits, dim=-1) | |
idx_next = torch.multinomial(probs, num_samples=1) | |
else: | |
idx_next = torch.argmax(logits, dim=-1, keepdim=True) | |
if eos_id is not None and idx_next == eos_id: | |
break | |
idx = torch.cat((idx, idx_next), dim=1) | |
return idx | |
@staticmethod | |
def generate_text_simple(model, idx, max_new_tokens, context_size): | |
""" | |
Generates text greedily (no temperature, no top-k) for demonstration. | |
:param model: PyTorch model. | |
:param idx: Tensor of shape (1, context_so_far_length). | |
:param max_new_tokens: Number of tokens to generate. | |
:param context_size: Maximum context size of the model. | |
:return: Updated idx tensor with new tokens appended. | |
""" | |
for _ in range(max_new_tokens): | |
idx_cond = idx[:, -context_size:] | |
with torch.no_grad(): | |
logits = model(idx_cond) | |
logits = logits[:, -1, :] | |
probas = torch.softmax(logits, dim=-1) | |
idx_next = torch.argmax(probas, dim=-1, keepdim=True) | |
idx = torch.cat((idx, idx_next), dim=1) | |
return idx | |
@staticmethod | |
def assign(left, right): | |
""" | |
Helper method to assign Numpy arrays to PyTorch parameters with shape checking. | |
:param left: Torch Parameter. | |
:param right: Numpy array with the same shape. | |
:return: Torch Parameter with the assigned data. | |
""" | |
if left.shape != right.shape: | |
raise ValueError(f"Shape mismatch. Left: {left.shape}, " | |
f"Right: {right.shape}") | |
return torch.nn.Parameter(torch.tensor(right)) | |
@staticmethod | |
def load_weights_into_gpt(gpt, params): | |
""" | |
Loads pre-trained GPT-2 weights from the 'params' dictionary into a PyTorch GPT model. | |
:param gpt: GPTModel (PyTorch) instance. | |
:param params: Dictionary structure of weights (from TF checkpoint). | |
""" | |
gpt.pos_emb.weight = GPT2.assign(gpt.pos_emb.weight, params['wpe']) | |
gpt.tok_emb.weight = GPT2.assign(gpt.tok_emb.weight, params['wte']) | |
for b in range(len(params["blocks"])): | |
q_w, k_w, v_w = np.split( | |
(params["blocks"][b]["attn"]["c_attn"])["w"], 3, axis=-1 | |
) | |
gpt.trf_blocks[b].att.W_query.weight = GPT2.assign( | |
gpt.trf_blocks[b].att.W_query.weight, q_w.T | |
) | |
gpt.trf_blocks[b].att.W_key.weight = GPT2.assign( | |
gpt.trf_blocks[b].att.W_key.weight, k_w.T | |
) | |
gpt.trf_blocks[b].att.W_value.weight = GPT2.assign( | |
gpt.trf_blocks[b].att.W_value.weight, v_w.T | |
) | |
q_b, k_b, v_b = np.split( | |
(params["blocks"][b]["attn"]["c_attn"])["b"], 3, axis=-1 | |
) | |
gpt.trf_blocks[b].att.W_query.bias = GPT2.assign( | |
gpt.trf_blocks[b].att.W_query.bias, q_b | |
) | |
gpt.trf_blocks[b].att.W_key.bias = GPT2.assign( | |
gpt.trf_blocks[b].att.W_key.bias, k_b | |
) | |
gpt.trf_blocks[b].att.W_value.bias = GPT2.assign( | |
gpt.trf_blocks[b].att.W_value.bias, v_b | |
) | |
gpt.trf_blocks[b].att.out_proj.weight = GPT2.assign( | |
gpt.trf_blocks[b].att.out_proj.weight, | |
params["blocks"][b]["attn"]["c_proj"]["w"].T | |
) | |
gpt.trf_blocks[b].att.out_proj.bias = GPT2.assign( | |
gpt.trf_blocks[b].att.out_proj.bias, | |
params["blocks"][b]["attn"]["c_proj"]["b"] | |
) | |
gpt.trf_blocks[b].ff.layers[0].weight = GPT2.assign( | |
gpt.trf_blocks[b].ff.layers[0].weight, | |
params["blocks"][b]["mlp"]["c_fc"]["w"].T | |
) | |
gpt.trf_blocks[b].ff.layers[0].bias = GPT2.assign( | |
gpt.trf_blocks[b].ff.layers[0].bias, | |
params["blocks"][b]["mlp"]["c_fc"]["b"] | |
) | |
gpt.trf_blocks[b].ff.layers[2].weight = GPT2.assign( | |
gpt.trf_blocks[b].ff.layers[2].weight, | |
params["blocks"][b]["mlp"]["c_proj"]["w"].T | |
) | |
gpt.trf_blocks[b].ff.layers[2].bias = GPT2.assign( | |
gpt.trf_blocks[b].ff.layers[2].bias, | |
params["blocks"][b]["mlp"]["c_proj"]["b"] | |
) | |
gpt.trf_blocks[b].norm1.scale = GPT2.assign( | |
gpt.trf_blocks[b].norm1.scale, | |
params["blocks"][b]["ln_1"]["g"] | |
) | |
gpt.trf_blocks[b].norm1.shift = GPT2.assign( | |
gpt.trf_blocks[b].norm1.shift, | |
params["blocks"][b]["ln_1"]["b"] | |
) | |
gpt.trf_blocks[b].norm2.scale = GPT2.assign( | |
gpt.trf_blocks[b].norm2.scale, | |
params["blocks"][b]["ln_2"]["g"] | |
) | |
gpt.trf_blocks[b].norm2.shift = GPT2.assign( | |
gpt.trf_blocks[b].norm2.shift, | |
params["blocks"][b]["ln_2"]["b"] | |
) | |
gpt.final_norm.scale = GPT2.assign(gpt.final_norm.scale, params["g"]) | |
gpt.final_norm.shift = GPT2.assign(gpt.final_norm.shift, params["b"]) | |
gpt.out_head.weight = GPT2.assign(gpt.out_head.weight, params["wte"]) | |
# -------------------------------------------------------------------------------- | |
# Internal Dataset Classes | |
# -------------------------------------------------------------------------------- | |
class SpamDataset(Dataset): | |
""" | |
Dataset for spam/ham classification using GPT tokenization for text encoding. | |
""" | |
def __init__(self, csv_file, tokenizer, max_length=None, pad_token_id=50256): | |
""" | |
:param csv_file: CSV file path with columns 'Text' and 'Label'. | |
:param tokenizer: GPT-2-compatible tokenizer for encoding text. | |
:param max_length: Maximum sequence length. If None, determined by the longest example. | |
:param pad_token_id: Token ID used for padding. | |
""" | |
self.data = pd.read_csv(csv_file) | |
self.encoded_texts = [tokenizer.encode(text) for text in self.data["Text"]] | |
# Determine or set max length | |
if max_length is None: | |
self.max_length = self._longest_encoded_length() | |
else: | |
self.max_length = max_length | |
self.encoded_texts = [ | |
encoded_text[:self.max_length] for encoded_text in self.encoded_texts | |
] | |
# Pad encoded texts to max_length | |
self.encoded_texts = [ | |
encoded_text + [pad_token_id] * (self.max_length - len(encoded_text)) | |
for encoded_text in self.encoded_texts | |
] | |
def __getitem__(self, index): | |
encoded = self.encoded_texts[index] | |
label = self.data.iloc[index]["Label"] | |
return ( | |
torch.tensor(encoded, dtype=torch.long), | |
torch.tensor(label, dtype=torch.long) | |
) | |
def __len__(self): | |
return len(self.data) | |
def _longest_encoded_length(self): | |
""" | |
Finds the maximum length among all encoded texts. | |
""" | |
max_length = 0 | |
for encoded_text in self.encoded_texts: | |
encoded_length = len(encoded_text) | |
if encoded_length > max_length: | |
max_length = encoded_length | |
return max_length | |
# -------------------------------------------------------------------------------- | |
# Simple Dataset for GPT-like Training | |
# -------------------------------------------------------------------------------- | |
class GPTDatasetV1(Dataset): | |
""" | |
A dataset class that slices text into overlapping chunks for GPT training. | |
""" | |
def __init__(self, txt, tokenizer, max_length, stride): | |
""" | |
:param txt: A long text string. | |
:param tokenizer: GPT-2-compatible tokenizer. | |
:param max_length: Sequence length for each chunk. | |
:param stride: Step size between consecutive chunks. | |
""" | |
self.input_ids = [] | |
self.target_ids = [] | |
token_ids = tokenizer.encode(txt) | |
# Overlapping slices | |
for i in range(0, len(token_ids) - max_length, stride): | |
input_chunk = token_ids[i:i + max_length] | |
target_chunk = token_ids[i + 1:i + max_length + 1] | |
self.input_ids.append(torch.tensor(input_chunk)) | |
self.target_ids.append(torch.tensor(target_chunk)) | |
def __len__(self): | |
return len(self.input_ids) | |
def __getitem__(self, idx): | |
return self.input_ids[idx], self.target_ids[idx] | |
# -------------------------------------------------------------------------------- | |
# Tokenizer Classes (for demonstration) | |
# -------------------------------------------------------------------------------- | |
class SimpleTokenizerV1: | |
""" | |
A very basic tokenizer using string split, for demonstration only. | |
""" | |
def __init__(self, vocab): | |
self.str_to_int = vocab | |
self.int_to_str = {i: s for s, i in vocab.items()} | |
def encode(self, text): | |
preprocessed = re.split(r'([,.?_!"()\']|--|\s)', text) | |
preprocessed = [item.strip() for item in preprocessed if item.strip()] | |
ids = [self.str_to_int[s] for s in preprocessed] | |
return ids | |
def decode(self, ids): | |
text = " ".join([self.int_to_str[i] for i in ids]) | |
text = re.sub(r'\s+([,.?!"()\'])', r'\1', text) | |
return text | |
class SimpleTokenizerV2: | |
""" | |
A slightly improved tokenizer with an <|unk|> token. | |
""" | |
def __init__(self, vocab): | |
self.str_to_int = vocab | |
self.int_to_str = {i: s for s, i in vocab.items()} | |
def encode(self, text): | |
preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text) | |
preprocessed = [item.strip() for item in preprocessed if item.strip()] | |
preprocessed = [ | |
item if item in self.str_to_int else "<|unk|>" for item in preprocessed | |
] | |
ids = [self.str_to_int[s] for s in preprocessed] | |
return ids | |
def decode(self, ids): | |
text = " ".join([self.int_to_str[i] for i in ids]) | |
text = re.sub(r'\s+([,.:;?!"()\'])', r'\1', text) | |
return text | |
# -------------------------------------------------------------------------------- | |
# GPT Components: Attention, FeedForward, Transformer Blocks, etc. | |
# -------------------------------------------------------------------------------- | |
class MultiHeadAttention(nn.Module): | |
""" | |
Multi-head Causal Self-Attention module used in GPT models. | |
""" | |
def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False): | |
super().__init__() | |
assert (d_out % num_heads == 0), \ | |
"d_out must be divisible by num_heads" | |
self.d_out = d_out | |
self.num_heads = num_heads | |
self.head_dim = d_out // num_heads | |
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.out_proj = nn.Linear(d_out, d_out) | |
self.dropout = nn.Dropout(dropout) | |
# Upper triangular mask to prevent attention to future tokens | |
self.register_buffer( | |
"mask", | |
torch.triu(torch.ones(context_length, context_length), diagonal=1) | |
) | |
def forward(self, x): | |
b, num_tokens, d_in = x.shape | |
keys = self.W_key(x) | |
queries = self.W_query(x) | |
values = self.W_value(x) | |
# Reshape to (batch_size, num_heads, seq_len, head_dim) | |
keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) | |
values = values.view(b, num_tokens, self.num_heads, self.head_dim) | |
queries = queries.view(b, num_tokens, self.num_heads, self.head_dim) | |
keys = keys.transpose(1, 2) | |
queries = queries.transpose(1, 2) | |
values = values.transpose(1, 2) | |
# Compute attention scores | |
attn_scores = queries @ keys.transpose(2, 3) | |
# Apply causal mask | |
mask_bool = self.mask.bool()[:num_tokens, :num_tokens] | |
attn_scores.masked_fill_(mask_bool, -torch.inf) | |
attn_weights = torch.softmax( | |
attn_scores / keys.shape[-1]**0.5, dim=-1 | |
) | |
attn_weights = self.dropout(attn_weights) | |
# Combine values (context vector) | |
context_vec = (attn_weights @ values).transpose(1, 2) | |
context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out) | |
# Final linear projection | |
context_vec = self.out_proj(context_vec) | |
return context_vec | |
class FeedForward(nn.Module): | |
""" | |
The feed-forward sub-layer of a Transformer block. | |
""" | |
def __init__(self, cfg): | |
super().__init__() | |
self.layers = nn.Sequential( | |
nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), | |
GPT2.GELU(), | |
nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), | |
) | |
def forward(self, x): | |
return self.layers(x) | |
class GELU(nn.Module): | |
""" | |
GELU activation function used in GPT-2. | |
""" | |
def __init__(self): | |
super().__init__() | |
def forward(self, x): | |
return 0.5 * x * (1 + torch.tanh( | |
torch.sqrt(torch.tensor(2.0 / torch.pi)) * | |
(x + 0.044715 * torch.pow(x, 3)) | |
)) | |
class LayerNorm(nn.Module): | |
""" | |
Layer normalization without bias/affine. | |
""" | |
def __init__(self, emb_dim): | |
super().__init__() | |
self.eps = 1e-5 | |
self.scale = nn.Parameter(torch.ones(emb_dim)) | |
self.shift = nn.Parameter(torch.zeros(emb_dim)) | |
def forward(self, x): | |
mean = x.mean(dim=-1, keepdim=True) | |
var = x.var(dim=-1, keepdim=True, unbiased=False) | |
norm_x = (x - mean) / torch.sqrt(var + self.eps) | |
return self.scale * norm_x + self.shift | |
class TransformerBlock(nn.Module): | |
""" | |
A single Transformer block consisting of (LayerNorm -> Self-Attn -> FF) with residuals. | |
""" | |
def __init__(self, cfg): | |
super().__init__() | |
self.att = GPT2.MultiHeadAttention( | |
d_in=cfg["emb_dim"], | |
d_out=cfg["emb_dim"], | |
context_length=cfg["context_length"], | |
num_heads=cfg["n_heads"], | |
dropout=cfg["drop_rate"], | |
qkv_bias=cfg["qkv_bias"] | |
) | |
self.ff = GPT2.FeedForward(cfg) | |
self.norm1 = GPT2.LayerNorm(cfg["emb_dim"]) | |
self.norm2 = GPT2.LayerNorm(cfg["emb_dim"]) | |
self.drop_shortcut = nn.Dropout(cfg["drop_rate"]) | |
def forward(self, x): | |
# Self-attention sub-layer | |
shortcut = x | |
x = self.norm1(x) | |
x = self.att(x) | |
x = self.drop_shortcut(x) | |
x = x + shortcut | |
# Feed-forward sub-layer | |
shortcut = x | |
x = self.norm2(x) | |
x = self.ff(x) | |
x = self.drop_shortcut(x) | |
x = x + shortcut | |
return x | |
class GPTModel(nn.Module): | |
""" | |
A GPT-style model: token embedding -> positional embedding -> transformer blocks -> final norm -> output head. | |
""" | |
def __init__(self, cfg): | |
super().__init__() | |
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) | |
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) | |
self.drop_emb = nn.Dropout(cfg["drop_rate"]) | |
# Create n_layers TransformerBlocks in a Sequential | |
self.trf_blocks = nn.Sequential( | |
*[GPT2.TransformerBlock(cfg) for _ in range(cfg["n_layers"])] | |
) | |
self.final_norm = GPT2.LayerNorm(cfg["emb_dim"]) | |
self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False) | |
def forward(self, in_idx): | |
# in_idx: (batch_size, seq_len) | |
batch_size, seq_len = in_idx.shape | |
tok_embeds = self.tok_emb(in_idx) | |
pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device)) | |
x = tok_embeds + pos_embeds | |
x = self.drop_emb(x) | |
x = self.trf_blocks(x) | |
x = self.final_norm(x) | |
# Project to vocabulary | |
logits = self.out_head(x).to(in_idx.device) | |
return logits | |
# -------------------------------------------------------------------------------- | |
# Dummy Models (for demonstration or debugging) | |
# -------------------------------------------------------------------------------- | |
class DummyGPTModel(nn.Module): | |
""" | |
A dummy GPTModel that does nothing in its Transformer blocks, used for debugging. | |
""" | |
def __init__(self, cfg): | |
super().__init__() | |
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) | |
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) | |
self.drop_emb = nn.Dropout(cfg["drop_rate"]) | |
self.trf_blocks = nn.Sequential( | |
*[GPT2.DummyTransformerBlock(cfg) for _ in range(cfg["n_layers"])] | |
) | |
self.final_norm = GPT2.DummyLayerNorm(cfg["emb_dim"]) | |
self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False) | |
def forward(self, in_idx): | |
batch_size, seq_len = in_idx.shape | |
tok_embeds = self.tok_emb(in_idx) | |
pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device)) | |
x = tok_embeds + pos_embeds | |
x = self.drop_emb(x) | |
x = self.trf_blocks(x) | |
x = self.final_norm(x) | |
logits = self.out_head(x) | |
return logits | |
class DummyTransformerBlock(nn.Module): | |
""" | |
A dummy TransformerBlock that simply returns the input. Used for debugging. | |
""" | |
def __init__(self, cfg): | |
super().__init__() | |
def forward(self, x): | |
return x | |
class DummyLayerNorm(nn.Module): | |
""" | |
A dummy layer norm that returns the input unmodified. | |
""" | |
def __init__(self, normalized_shape, eps=1e-5): | |
super().__init__() | |
def forward(self, x): | |
return x | |
# -------------------------------------------------------------------------------- | |
# Utility function to create a DataLoader for GPT training (example usage). | |
# -------------------------------------------------------------------------------- | |
@staticmethod | |
def create_dataloader_v1(txt, batch_size=4, max_length=256, stride=128, | |
shuffle=True, drop_last=True, num_workers=0): | |
""" | |
Creates a DataLoader for GPT-like training using GPTDatasetV1. | |
:param txt: A long text string. | |
:param batch_size: Batch size for DataLoader. | |
:param max_length: Sequence length for each chunk. | |
:param stride: Step size for chunk creation. | |
:param shuffle: Whether to shuffle the dataset. | |
:param drop_last: Whether to drop the last incomplete batch. | |
:param num_workers: Number of workers for data loading. | |
:return: A PyTorch DataLoader. | |
""" | |
tokenizer = tiktoken.get_encoding("gpt2") | |
dataset = GPT2.GPTDatasetV1(txt, tokenizer, max_length, stride) | |
dataloader = DataLoader( | |
dataset, | |
batch_size=batch_size, | |
shuffle=shuffle, | |
drop_last=drop_last, | |
num_workers=num_workers | |
) | |
return dataloader |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tiktoken | |
import torch | |
import torch.nn as nn | |
import urllib.request | |
import re | |
from torch.utils.data import Dataset, DataLoader | |
import numpy as np | |
import urllib.request | |
import zipfile | |
import os | |
from pathlib import Path | |
import pandas as pd | |
import time | |
# helper functions | |
def train_classifier_simple(model, train_loader, val_loader, optimizer, device, num_epochs, eval_freq, eval_iter): | |
train_losses, val_losses, train_accs, val_accs = [], [], [], [] | |
examples_seen, global_step = 0, -1 | |
for epoch in range(num_epochs): | |
model.train() | |
for input_batch, target_batch in train_loader: | |
optimizer.zero_grad() | |
loss = calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
loss.backward() | |
optimizer.step() | |
examples_seen += input_batch.shape[0] | |
global_step += 1 | |
if global_step % eval_freq == 0: | |
train_loss, val_loss = evaluate_model(model, train_loader, val_loader, device, eval_iter) | |
train_losses.append(train_loss) | |
val_losses.append(val_loss) | |
print(f"Ep {epoch+1} (Step {global_step:06d}): " | |
f"Train loss {train_loss:.3f}, " | |
f"Val loss {val_loss:.3f}" | |
) | |
train_accuracy = calc_accuracy_loader( | |
train_loader, model, device, num_batches=eval_iter | |
) | |
val_accuracy = calc_accuracy_loader( | |
val_loader, model, device, num_batches=eval_iter | |
) | |
print(f"Training accuracy: {train_accuracy*100:.2f}% | ", end="") | |
print(f"Validation accuracy: {val_accuracy*100:.2f}%") | |
train_accs.append(train_accuracy) | |
val_accs.append(val_accuracy) | |
return train_losses, val_losses, train_accs, val_accs, examples_seen | |
def calc_accuracy_loader(data_loader, model, device, num_batches=None): | |
model.eval() | |
correct_predictions, num_examples = 0, 0 | |
if num_batches is None: | |
num_batches = len(data_loader) | |
else: | |
num_batches = min(num_batches, len(data_loader)) | |
for i, (input_batch, target_batch) in enumerate(data_loader): | |
if i < num_batches: | |
input_batch = input_batch.to(device) | |
target_batch = target_batch.to(device) | |
#print("2::", device) | |
#print("2::", next(model.parameters()).device) | |
with torch.no_grad(): | |
logits = model(input_batch)[:, -1, :] | |
predicted_labels = torch.argmax(logits, dim=-1) | |
num_examples += predicted_labels.shape[0] | |
correct_predictions += ( | |
(predicted_labels == target_batch).sum().item() | |
) | |
else: | |
break | |
return correct_predictions / num_examples | |
class SpamDataset(Dataset): | |
def __init__(self, csv_file, tokenizer, max_length=None, pad_token_id=50256): | |
self.data = pd.read_csv(csv_file) | |
self.encoded_texts = [ tokenizer.encode(text) for text in self.data["Text"] ] | |
if max_length is None: | |
self.max_length = self._longest_encoded_length() | |
else: | |
self.max_length = max_length | |
self.encoded_texts = [ | |
encoded_text[:self.max_length] | |
for encoded_text in self.encoded_texts | |
] | |
self.encoded_texts = [ | |
encoded_text + [pad_token_id] * | |
(self.max_length - len(encoded_text)) | |
for encoded_text in self.encoded_texts | |
] | |
def __getitem__(self, index): | |
encoded = self.encoded_texts[index] | |
label = self.data.iloc[index]["Label"] | |
return ( | |
torch.tensor(encoded, dtype=torch.long), | |
torch.tensor(label, dtype=torch.long) | |
) | |
def __len__(self): | |
return len(self.data) | |
def _longest_encoded_length(self): | |
max_length = 0 | |
for encoded_text in self.encoded_texts: | |
encoded_length = len(encoded_text) | |
if encoded_length > max_length: | |
max_length = encoded_length | |
return max_length | |
def random_split(df, train_frac, validation_frac): | |
df = df.sample(frac=1, random_state=123).reset_index(drop=True) | |
train_end = int(len(df) * train_frac) | |
validation_end = train_end + int(len(df) * validation_frac) | |
train_df = df[:train_end] | |
validation_df = df[train_end:validation_end] | |
test_df = df[validation_end:] | |
return train_df, validation_df, test_df | |
def create_balanced_dataset(df): | |
num_spam = df[df["Label"] == "spam"].shape[0] | |
ham_subset = df[df["Label"] == "ham"].sample( | |
num_spam, random_state=123 | |
) | |
balanced_df = pd.concat([ | |
ham_subset, df[df["Label"] == "spam"] | |
]) | |
return balanced_df | |
def download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path): | |
if data_file_path.exists(): | |
print(f"{data_file_path} already exists. Skipping download and extraction.") | |
return | |
with urllib.request.urlopen(url) as response: | |
with open(zip_path, "wb") as out_file: | |
out_file.write(response.read()) | |
with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
zip_ref.extractall(extracted_path) | |
original_file_path = Path(extracted_path) / "SMSSpamCollection" | |
os.rename(original_file_path, data_file_path) | |
print(f"File downloaded and saved as {data_file_path}") | |
def evaluate_model(model, train_loader, val_loader, device, eval_iter): | |
model.eval() | |
with torch.no_grad(): | |
train_loss = calc_loss_loader( | |
train_loader, model, device, num_batches=eval_iter | |
) | |
val_loss = calc_loss_loader( | |
val_loader, model, device, num_batches=eval_iter | |
) | |
model.train() | |
return train_loss, val_loss | |
def generate_and_print_sample(model, tokenizer, device, start_context): | |
model.eval() | |
context_size = model.pos_emb.weight.shape[0] | |
encoded = text_to_token_ids(start_context, tokenizer).to(device) | |
with torch.no_grad(): | |
token_ids = generate_text_simple( | |
model=model, idx=encoded, | |
max_new_tokens=50, context_size=context_size | |
) | |
decoded_text = token_ids_to_text(token_ids, tokenizer) | |
print(decoded_text.replace("\n", " ")) | |
model.train() | |
def train_model_simple(model, train_loader, val_loader, optimizer, device, num_epochs, eval_freq, eval_iter, start_context, tokenizer): | |
train_losses, val_losses, track_tokens_seen = [], [], [] | |
tokens_seen, global_step = 0, -1 | |
for epoch in range(num_epochs): | |
model.train() | |
for input_batch, target_batch in train_loader: | |
optimizer.zero_grad() | |
loss = calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
loss.backward() | |
optimizer.step() | |
tokens_seen += input_batch.numel() | |
global_step += 1 | |
if global_step % eval_freq == 0: | |
train_loss, val_loss = evaluate_model( | |
model, train_loader, val_loader, device, eval_iter) | |
train_losses.append(train_loss) | |
val_losses.append(val_loss) | |
track_tokens_seen.append(tokens_seen) | |
print(f"Ep {epoch+1} (Step {global_step:06d}): " | |
f"Train loss {train_loss:.3f}, " | |
f"Val loss {val_loss:.3f}" | |
) | |
generate_and_print_sample( | |
model, tokenizer, device, start_context | |
) | |
return train_losses, val_losses, track_tokens_seen | |
def calc_loss_loader(data_loader, model, device, num_batches=None): | |
total_loss = 0. | |
if len(data_loader) == 0: | |
return float("nan") | |
elif num_batches is None: | |
num_batches = len(data_loader) | |
else: | |
num_batches = min(num_batches, len(data_loader)) | |
for i, (input_batch, target_batch) in enumerate(data_loader): | |
if i < num_batches: | |
loss = calc_loss_batch( | |
input_batch, target_batch, model, device | |
) | |
total_loss += loss.item() | |
else: | |
break | |
return total_loss / num_batches | |
""" | |
def calc_loss_batch(input_batch, target_batch, model, device): | |
input_batch = input_batch.to(device) | |
target_batch = target_batch.to(device) | |
logits = model(input_batch) | |
loss = torch.nn.functional.cross_entropy( | |
logits.flatten(0, 1), target_batch.flatten() | |
) | |
return loss | |
""" | |
def calc_loss_batch(input_batch, target_batch, model, device): | |
input_batch = input_batch.to(device) | |
target_batch = target_batch.to(device) | |
logits = model(input_batch)[:, -1, :] | |
loss = torch.nn.functional.cross_entropy(logits, target_batch) | |
return loss | |
def text_to_token_ids(text, tokenizer): | |
encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'}) | |
encoded_tensor = torch.tensor(encoded).unsqueeze(0) | |
return encoded_tensor | |
def token_ids_to_text(token_ids, tokenizer): | |
flat = token_ids.squeeze(0) | |
return tokenizer.decode(flat.tolist()) | |
def generate(model, idx, max_new_tokens, context_size, temperature=0.0, top_k=None, eos_id=None): | |
for _ in range(max_new_tokens): | |
idx_cond = idx[:, -context_size:] | |
with torch.no_grad(): | |
logits = model(idx_cond) | |
logits = logits[:, -1, :] | |
if top_k is not None: | |
top_logits, _ = torch.topk(logits, top_k) | |
min_val = top_logits[:, -1] | |
logits = torch.where( | |
logits < min_val, | |
torch.tensor(float('-inf')).to(logits.device), | |
logits | |
) | |
if temperature > 0.0: | |
logits = logits / temperature | |
probs = torch.softmax(logits, dim=-1) | |
idx_next = torch.multinomial(probs, num_samples=1) | |
else: | |
idx_next = torch.argmax(logits, dim=-1, keepdim=True) | |
if idx_next == eos_id: | |
break | |
idx = torch.cat((idx, idx_next), dim=1) | |
return idx | |
def generate_text_simple(model, idx, max_new_tokens, context_size): | |
for _ in range(max_new_tokens): | |
idx_cond = idx[:, -context_size:] | |
with torch.no_grad(): | |
logits = model(idx_cond) | |
logits = logits[:, -1, :] | |
probas = torch.softmax(logits, dim=-1) | |
idx_next = torch.argmax(probas, dim=-1, keepdim=True) | |
idx = torch.cat((idx, idx_next), dim=1) | |
return idx | |
class MultiHeadAttention(nn.Module): | |
def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False): | |
super().__init__() | |
assert (d_out % num_heads == 0), \ | |
"d_out must be divisible by num_heads" | |
self.d_out = d_out | |
self.num_heads = num_heads | |
self.head_dim = d_out // num_heads | |
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.out_proj = nn.Linear(d_out, d_out) | |
self.dropout = nn.Dropout(dropout) | |
self.register_buffer( | |
"mask", | |
torch.triu(torch.ones(context_length, context_length), diagonal=1) | |
) | |
def forward(self, x): | |
b, num_tokens, d_in = x.shape | |
keys = self.W_key(x) | |
queries = self.W_query(x) | |
values = self.W_value(x) | |
keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) | |
values = values.view(b, num_tokens, self.num_heads, self.head_dim) | |
queries = queries.view( | |
b, num_tokens, self.num_heads, self.head_dim | |
) | |
keys = keys.transpose(1, 2) | |
queries = queries.transpose(1, 2) | |
values = values.transpose(1, 2) | |
attn_scores = queries @ keys.transpose(2, 3) | |
mask_bool = self.mask.bool()[:num_tokens, :num_tokens] | |
attn_scores.masked_fill_(mask_bool, -torch.inf) | |
attn_weights = torch.softmax( | |
attn_scores / keys.shape[-1]**0.5, dim=-1 | |
) | |
attn_weights = self.dropout(attn_weights) | |
context_vec = (attn_weights @ values).transpose(1, 2) | |
context_vec = context_vec.contiguous().view( | |
b, num_tokens, self.d_out | |
) | |
context_vec = self.out_proj(context_vec) | |
return context_vec | |
class MultiHeadAttentionWrapper(nn.Module): | |
def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False): | |
super().__init__() | |
self.heads = nn.ModuleList( | |
[ | |
CausalAttention(d_in, d_out, context_length, dropout, qkv_bias) | |
for _ in range(num_heads) | |
] | |
) | |
def forward(self, x): | |
return torch.cat([head(x) for head in self.heads], dim=-1) | |
class CausalAttention(nn.Module): | |
def __init__(self, d_in, d_out, context_length, dropout, qkv_bias=False): | |
super().__init__() | |
self.d_out = d_out | |
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.dropout = nn.Dropout(dropout) | |
self.register_buffer( | |
'mask', | |
torch.triu(torch.ones(context_length, context_length),diagonal=1) | |
) | |
def forward(self, x): | |
b, num_tokens, d_in = x.shape | |
keys = self.W_key(x) | |
queries = self.W_query(x) | |
values = self.W_value(x) | |
attn_scores = queries @ keys.transpose(1, 2) | |
attn_scores.masked_fill_( | |
self.mask.bool()[:num_tokens, :num_tokens], -torch.inf) | |
attn_weights = torch.softmax( | |
attn_scores / keys.shape[-1]**0.5, dim=-1 | |
) | |
attn_weights = self.dropout(attn_weights) | |
context_vec = attn_weights @ values | |
return context_vec | |
class SelfAttention_v2(nn.Module): | |
def __init__(self, d_in, d_out, qkv_bias=False): | |
super().__init__() | |
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) | |
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) | |
def forward(self, x): | |
keys = self.W_key(x) | |
queries = self.W_query(x) | |
values = self.W_value(x) | |
attn_scores = queries @ keys.T | |
attn_weights = torch.softmax( | |
attn_scores / keys.shape[-1]**0.5, dim=-1 | |
) | |
context_vec = attn_weights @ values | |
return context_vec | |
class SelfAttention_v1(nn.Module): | |
def __init__(self, d_in, d_out): | |
super().__init__() | |
self.W_query = nn.Parameter(torch.rand(d_in, d_out)) | |
self.W_key = nn.Parameter(torch.rand(d_in, d_out)) | |
self.W_value = nn.Parameter(torch.rand(d_in, d_out)) | |
def forward(self, x): | |
keys = x @ self.W_key | |
queries = x @ self.W_query | |
values = x @ self.W_value | |
attn_scores = queries @ keys.T # omega | |
attn_weights = torch.softmax( | |
attn_scores / keys.shape[-1]**0.5, dim=-1 | |
) | |
context_vec = attn_weights @ values | |
return context_vec | |
class SimpleTokenizerV1: | |
def __init__(self, vocab): | |
self.str_to_int = vocab | |
self.int_to_str = {i:s for s,i in vocab.items()} | |
def encode(self, text): | |
preprocessed = re.split(r'([,.?_!"()\']|--|\s)', text) | |
preprocessed = [ | |
item.strip() for item in preprocessed if item.strip() | |
] | |
ids = [self.str_to_int[s] for s in preprocessed] | |
return ids | |
def decode(self, ids): | |
text = " ".join([self.int_to_str[i] for i in ids]) | |
text = re.sub(r'\s+([,.?!"()\'])', r'\1', text) | |
return text | |
class SimpleTokenizerV2: | |
def __init__(self, vocab): | |
self.str_to_int = vocab | |
self.int_to_str = { i:s for s,i in vocab.items()} | |
def encode(self, text): | |
preprocessed = re.split(r'([,.:;?_!"()\']|--|\s)', text) | |
preprocessed = [ | |
item.strip() for item in preprocessed if item.strip() | |
] | |
preprocessed = [item if item in self.str_to_int | |
else "<|unk|>" for item in preprocessed] | |
ids = [self.str_to_int[s] for s in preprocessed] | |
return ids | |
def decode(self, ids): | |
text = " ".join([self.int_to_str[i] for i in ids]) | |
text = re.sub(r'\s+([,.:;?!"()\'])', r'\1', text) | |
return text | |
class GPTDatasetV1(Dataset): | |
def __init__(self, txt, tokenizer, max_length, stride): | |
self.input_ids = [] | |
self.target_ids = [] | |
token_ids = tokenizer.encode(txt) | |
for i in range(0, len(token_ids) - max_length, stride): | |
input_chunk = token_ids[i:i + max_length] | |
target_chunk = token_ids[i + 1: i + max_length + 1] | |
self.input_ids.append(torch.tensor(input_chunk)) | |
self.target_ids.append(torch.tensor(target_chunk)) | |
def __len__(self): | |
return len(self.input_ids) | |
def __getitem__(self, idx): | |
return self.input_ids[idx], self.target_ids[idx] | |
def create_dataloader_v1(txt, batch_size=4, max_length=256, stride=128, shuffle=True, drop_last=True, num_workers=0): | |
tokenizer = tiktoken.get_encoding("gpt2") | |
dataset = GPTDatasetV1(txt, tokenizer, max_length, stride) | |
dataloader = DataLoader( | |
dataset, | |
batch_size=batch_size, | |
shuffle=shuffle, | |
drop_last=drop_last, | |
num_workers=num_workers | |
) | |
return dataloader | |
class FeedForward(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.layers = nn.Sequential( | |
nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]), | |
GELU(), | |
nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]), | |
) | |
def forward(self, x): | |
return self.layers(x) | |
class GELU(nn.Module): | |
def __init__(self): | |
super().__init__() | |
def forward(self, x): | |
return 0.5 * x * (1 + torch.tanh( | |
torch.sqrt(torch.tensor(2.0 / torch.pi)) * | |
(x + 0.044715 * torch.pow(x, 3)) | |
)) | |
class LayerNorm(nn.Module): | |
def __init__(self, emb_dim): | |
super().__init__() | |
self.eps = 1e-5 | |
self.scale = nn.Parameter(torch.ones(emb_dim)) | |
self.shift = nn.Parameter(torch.zeros(emb_dim)) | |
def forward(self, x): | |
mean = x.mean(dim=-1, keepdim=True) | |
var = x.var(dim=-1, keepdim=True, unbiased=False) | |
norm_x = (x - mean) / torch.sqrt(var + self.eps) | |
return self.scale * norm_x + self.shift | |
class TransformerBlock(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.att = MultiHeadAttention( | |
d_in=cfg["emb_dim"], | |
d_out=cfg["emb_dim"], | |
context_length=cfg["context_length"], | |
num_heads=cfg["n_heads"], | |
dropout=cfg["drop_rate"], | |
qkv_bias=cfg["qkv_bias"]) | |
self.ff = FeedForward(cfg) | |
self.norm1 = LayerNorm(cfg["emb_dim"]) | |
self.norm2 = LayerNorm(cfg["emb_dim"]) | |
self.drop_shortcut = nn.Dropout(cfg["drop_rate"]) | |
def forward(self, x): | |
shortcut = x | |
x = self.norm1(x) | |
x = self.att(x) | |
x = self.drop_shortcut(x) | |
x = x + shortcut | |
shortcut = x | |
x = self.norm2(x) | |
x = self.ff(x) | |
x = self.drop_shortcut(x) | |
x = x + shortcut | |
return x | |
class GPTModel(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) | |
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) | |
self.drop_emb = nn.Dropout(cfg["drop_rate"]) | |
self.trf_blocks = nn.Sequential( | |
*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]) | |
self.final_norm = LayerNorm(cfg["emb_dim"]) | |
self.out_head = nn.Linear( | |
cfg["emb_dim"], cfg["vocab_size"], bias=False | |
) | |
def forward(self, in_idx): | |
batch_size, seq_len = in_idx.shape | |
tok_embeds = self.tok_emb(in_idx) | |
pos_embeds = self.pos_emb( | |
torch.arange(seq_len, device=in_idx.device) | |
) | |
x = tok_embeds + pos_embeds | |
x = self.drop_emb(x) | |
x = self.trf_blocks(x) | |
x = self.final_norm(x) | |
print(in_idx.device) | |
logits = self.out_head(x).to(in_idx.device) | |
return logits | |
class DummyGPTModel(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) | |
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) | |
self.drop_emb = nn.Dropout(cfg["drop_rate"]) | |
self.trf_blocks = nn.Sequential( | |
*[DummyTransformerBlock(cfg) | |
for _ in range(cfg["n_layers"])] | |
) | |
self.final_norm = DummyLayerNorm(cfg["emb_dim"]) | |
self.out_head = nn.Linear( | |
cfg["emb_dim"], cfg["vocab_size"], bias=False | |
) | |
def forward(self, in_idx): | |
batch_size, seq_len = in_idx.shape | |
tok_embeds = self.tok_emb(in_idx) | |
pos_embeds = self.pos_emb( | |
torch.arange(seq_len, device=in_idx.device) | |
) | |
x = tok_embeds + pos_embeds | |
x = self.drop_emb(x) | |
x = self.trf_blocks(x) | |
x = self.final_norm(x) | |
logits = self.out_head(x) | |
return logits | |
class DummyTransformerBlock(nn.Module): | |
def __init__(self, cfg): | |
super().__init__() | |
def forward(self, x): | |
return x | |
class DummyLayerNorm(nn.Module): | |
def __init__(self, normalized_shape, eps=1e-5): | |
super().__init__() | |
def forward(self, x): | |
return x | |
def assign(left, right): | |
if left.shape != right.shape: | |
raise ValueError(f"Shape mismatch. Left: {left.shape}, " | |
"Right: {right.shape}" | |
) | |
return torch.nn.Parameter(torch.tensor(right)) | |
def load_weights_into_gpt(gpt, params): | |
gpt.pos_emb.weight = assign(gpt.pos_emb.weight, params['wpe']) | |
gpt.tok_emb.weight = assign(gpt.tok_emb.weight, params['wte']) | |
for b in range(len(params["blocks"])): | |
q_w, k_w, v_w = np.split( | |
(params["blocks"][b]["attn"]["c_attn"])["w"], 3, axis=-1) | |
gpt.trf_blocks[b].att.W_query.weight = assign( | |
gpt.trf_blocks[b].att.W_query.weight, q_w.T) | |
gpt.trf_blocks[b].att.W_key.weight = assign( | |
gpt.trf_blocks[b].att.W_key.weight, k_w.T) | |
gpt.trf_blocks[b].att.W_value.weight = assign( | |
gpt.trf_blocks[b].att.W_value.weight, v_w.T) | |
q_b, k_b, v_b = np.split( | |
(params["blocks"][b]["attn"]["c_attn"])["b"], 3, axis=-1) | |
gpt.trf_blocks[b].att.W_query.bias = assign( | |
gpt.trf_blocks[b].att.W_query.bias, q_b) | |
gpt.trf_blocks[b].att.W_key.bias = assign( | |
gpt.trf_blocks[b].att.W_key.bias, k_b) | |
gpt.trf_blocks[b].att.W_value.bias = assign( | |
gpt.trf_blocks[b].att.W_value.bias, v_b) | |
gpt.trf_blocks[b].att.out_proj.weight = assign( | |
gpt.trf_blocks[b].att.out_proj.weight, | |
params["blocks"][b]["attn"]["c_proj"]["w"].T) | |
gpt.trf_blocks[b].att.out_proj.bias = assign( | |
gpt.trf_blocks[b].att.out_proj.bias, | |
params["blocks"][b]["attn"]["c_proj"]["b"]) | |
gpt.trf_blocks[b].ff.layers[0].weight = assign( | |
gpt.trf_blocks[b].ff.layers[0].weight, | |
params["blocks"][b]["mlp"]["c_fc"]["w"].T) | |
gpt.trf_blocks[b].ff.layers[0].bias = assign( | |
gpt.trf_blocks[b].ff.layers[0].bias, | |
params["blocks"][b]["mlp"]["c_fc"]["b"]) | |
gpt.trf_blocks[b].ff.layers[2].weight = assign( | |
gpt.trf_blocks[b].ff.layers[2].weight, | |
params["blocks"][b]["mlp"]["c_proj"]["w"].T) | |
gpt.trf_blocks[b].ff.layers[2].bias = assign( | |
gpt.trf_blocks[b].ff.layers[2].bias, | |
params["blocks"][b]["mlp"]["c_proj"]["b"]) | |
gpt.trf_blocks[b].norm1.scale = assign( | |
gpt.trf_blocks[b].norm1.scale, | |
params["blocks"][b]["ln_1"]["g"]) | |
gpt.trf_blocks[b].norm1.shift = assign( | |
gpt.trf_blocks[b].norm1.shift, | |
params["blocks"][b]["ln_1"]["b"]) | |
gpt.trf_blocks[b].norm2.scale = assign( | |
gpt.trf_blocks[b].norm2.scale, | |
params["blocks"][b]["ln_2"]["g"]) | |
gpt.trf_blocks[b].norm2.shift = assign( | |
gpt.trf_blocks[b].norm2.shift, | |
params["blocks"][b]["ln_2"]["b"]) | |
gpt.final_norm.scale = assign(gpt.final_norm.scale, params["g"]) | |
gpt.final_norm.shift = assign(gpt.final_norm.shift, params["b"]) | |
gpt.out_head.weight = assign(gpt.out_head.weight, params["wte"]) | |
# config | |
GPT_CONFIG_124M = { | |
"vocab_size": 50257, | |
"context_length": 256, | |
"emb_dim": 768, | |
"n_heads": 12, | |
"n_layers": 12, | |
"drop_rate": 0.1, | |
"qkv_bias": False | |
} | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# fine tuning (spam classification) | |
if 1 == 1: | |
url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip" | |
zip_path = "sms_spam_collection.zip" | |
extracted_path = "sms_spam_collection" | |
data_file_path = Path(extracted_path) / "SMSSpamCollection.tsv" | |
download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path) | |
df = pd.read_csv(data_file_path, sep="\t", header=None, names=["Label", "Text"]) | |
print(df["Label"].value_counts()) | |
balanced_df = create_balanced_dataset(df) | |
print(balanced_df["Label"].value_counts()) | |
balanced_df["Label"] = balanced_df["Label"].map({"ham": 0, "spam": 1}) | |
train_df, validation_df, test_df = random_split(balanced_df, 0.7, 0.1) | |
train_df.to_csv("train.csv", index=None) | |
validation_df.to_csv("validation.csv", index=None) | |
test_df.to_csv("test.csv", index=None) | |
tokenizer = tiktoken.get_encoding("gpt2") | |
print(tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"})) | |
train_dataset = SpamDataset( | |
csv_file="train.csv", | |
max_length=None, | |
tokenizer=tokenizer | |
) | |
print(train_dataset.max_length) | |
val_dataset = SpamDataset( | |
csv_file="validation.csv", | |
max_length=train_dataset.max_length, | |
tokenizer=tokenizer | |
) | |
test_dataset = SpamDataset( | |
csv_file="test.csv", | |
max_length=train_dataset.max_length, | |
tokenizer=tokenizer | |
) | |
num_workers = 0 | |
batch_size = 8 | |
torch.manual_seed(123) | |
train_loader = DataLoader( | |
dataset=train_dataset, | |
batch_size=batch_size, | |
shuffle=True, | |
num_workers=num_workers, | |
drop_last=True, | |
) | |
val_loader = DataLoader( | |
dataset=val_dataset, | |
batch_size=batch_size, | |
num_workers=num_workers, | |
drop_last=False, | |
) | |
test_loader = DataLoader( | |
dataset=test_dataset, | |
batch_size=batch_size, | |
num_workers=num_workers, | |
drop_last=False, | |
) | |
for input_batch, target_batch in train_loader: | |
pass | |
print("Input batch dimensions:", input_batch.shape) | |
print("Label batch dimensions", target_batch.shape) | |
print(f"{len(train_loader)} training batches") | |
print(f"{len(val_loader)} validation batches") | |
print(f"{len(test_loader)} test batches") | |
#exit() | |
# load external weights | |
if 1 == 1: | |
# load weights from chatgpt | |
import urllib.request | |
url = ( | |
"https://raw.githubusercontent.com/rasbt/" | |
"LLMs-from-scratch/main/ch05/" | |
"01_main-chapter-code/gpt_download.py" | |
) | |
filename = url.split('/')[-1] | |
urllib.request.urlretrieve(url, filename) | |
from gpt_download import download_and_load_gpt2 | |
settings, params = download_and_load_gpt2( | |
model_size="124M", models_dir="gpt2" | |
) | |
settings, params = download_and_load_gpt2( | |
model_size="1558M", models_dir="gpt2" | |
) | |
print("Settings:", settings) | |
print("Parameter dictionary keys:", params.keys()) | |
model_configs = { | |
"gpt2-small (124M)": {"emb_dim": 768, "n_layers": 12, "n_heads": 12}, | |
"gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16}, | |
"gpt2-large (774M)": {"emb_dim": 1280, "n_layers": 36, "n_heads": 20}, | |
"gpt2-xl (1558M)": {"emb_dim": 1600, "n_layers": 48, "n_heads": 25}, | |
} | |
model_name = "gpt2-xl (1558M)" | |
NEW_CONFIG = GPT_CONFIG_124M.copy() | |
NEW_CONFIG.update(model_configs[model_name]) | |
NEW_CONFIG.update({"context_length": 1024}) | |
NEW_CONFIG.update({"qkv_bias": True}) | |
model = GPTModel(NEW_CONFIG) | |
model.eval() | |
load_weights_into_gpt(model, params) | |
model.to(device) | |
# output example text (#1) | |
tokenizer = tiktoken.get_encoding("gpt2") | |
torch.manual_seed(123) | |
token_ids = generate( | |
model=model, | |
idx=text_to_token_ids("Every effort moves you", tokenizer).to(device), | |
max_new_tokens=25, | |
context_size=NEW_CONFIG["context_length"], | |
top_k=50, | |
temperature=1.5 | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
# output example text (#2) | |
text_1 = "Every effort moves you" | |
token_ids = generate_text_simple( | |
model=model, | |
idx=text_to_token_ids(text_1, tokenizer).to(device), | |
max_new_tokens=15, | |
context_size=NEW_CONFIG["context_length"] | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
# output example text (#3) | |
text_2 = ( | |
"Is the following text 'spam'? Answer with 'yes' or 'no':" | |
" 'You are a winner you have been specially" | |
" selected to receive $1000 cash or a $2000 award.'" | |
) | |
token_ids = generate_text_simple( | |
model=model, | |
idx=text_to_token_ids(text_2, tokenizer).to(device), | |
max_new_tokens=23, | |
context_size=NEW_CONFIG["context_length"] | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
# fine tuning | |
if 1 == 1: | |
for param in model.parameters(): | |
param.requires_grad = False | |
torch.manual_seed(123) | |
num_classes = 2 | |
model.out_head = torch.nn.Linear( | |
in_features=NEW_CONFIG["emb_dim"], | |
out_features=num_classes | |
) | |
model.out_head.to(device) | |
for param in model.trf_blocks[-1].parameters(): | |
param.requires_grad = True | |
for param in model.final_norm.parameters(): | |
param.requires_grad = True | |
torch.manual_seed(123) | |
train_accuracy = calc_accuracy_loader( | |
train_loader, model, device, num_batches=10 | |
) | |
val_accuracy = calc_accuracy_loader( | |
val_loader, model, device, num_batches=10 | |
) | |
test_accuracy = calc_accuracy_loader( | |
test_loader, model, device, num_batches=10 | |
) | |
print(f"Training accuracy: {train_accuracy*100:.2f}%") | |
print(f"Validation accuracy: {val_accuracy*100:.2f}%") | |
print(f"Test accuracy: {test_accuracy*100:.2f}%") | |
# training | |
start_time = time.time() | |
torch.manual_seed(123) | |
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.1) | |
num_epochs = 5 | |
train_losses, val_losses, train_accs, val_accs, examples_seen = train_classifier_simple( | |
model, train_loader, val_loader, optimizer, device, | |
num_epochs=num_epochs, eval_freq=50, | |
eval_iter=5 | |
) | |
end_time = time.time() | |
execution_time_minutes = (end_time - start_time) / 60 | |
print(f"Training completed in {execution_time_minutes:.2f} minutes.") | |
train_accuracy = calc_accuracy_loader(train_loader, model, device) | |
val_accuracy = calc_accuracy_loader(val_loader, model, device) | |
test_accuracy = calc_accuracy_loader(test_loader, model, device) | |
print(f"Training accuracy: {train_accuracy*100:.2f}%") | |
print(f"Validation accuracy: {val_accuracy*100:.2f}%") | |
print(f"Test accuracy: {test_accuracy*100:.2f}%") | |
exit() | |
# train model | |
if 1 == 0: | |
tokenizer = tiktoken.get_encoding("gpt2") | |
batch = [] | |
txt1 = "Every effort moves you" | |
txt2 = "Every day holds a" | |
batch.append(torch.tensor(tokenizer.encode(txt1), device=device)) | |
batch.append(torch.tensor(tokenizer.encode(txt2), device=device)) | |
batch = torch.stack(batch, dim=0) | |
print(batch) | |
batch = batch.to(device) | |
torch.manual_seed(123) | |
model = GPTModel(GPT_CONFIG_124M) | |
model.to(device) | |
out = model(batch) | |
print("Input batch:\n", batch) | |
print("\nOutput shape:", out.shape) | |
print(out) | |
total_params = sum(p.numel() for p in model.parameters()) | |
print(f"Total number of parameters: {total_params:,}") | |
torch.manual_seed(123) | |
model = GPTModel(GPT_CONFIG_124M) | |
model.to(device) | |
model.eval() | |
start_context = "Every effort moves you" | |
tokenizer = tiktoken.get_encoding("gpt2") | |
token_ids = generate_text_simple( | |
model=model, | |
idx=text_to_token_ids(start_context, tokenizer).to(device), | |
max_new_tokens=10, | |
context_size=GPT_CONFIG_124M["context_length"] | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
file_path = "the-verdict.txt" | |
with open(file_path, "r", encoding="utf-8") as file: | |
text_data = file.read() | |
train_ratio = 0.90 | |
split_idx = int(train_ratio * len(text_data)) | |
train_data = text_data[:split_idx] | |
val_data = text_data[split_idx:] | |
train_loader = create_dataloader_v1( | |
train_data, | |
batch_size=2, | |
max_length=GPT_CONFIG_124M["context_length"], | |
stride=GPT_CONFIG_124M["context_length"], | |
drop_last=True, | |
shuffle=True, | |
num_workers=0 | |
) | |
val_loader = create_dataloader_v1( | |
val_data, | |
batch_size=2, | |
max_length=GPT_CONFIG_124M["context_length"], | |
stride=GPT_CONFIG_124M["context_length"], | |
drop_last=False, | |
shuffle=False, | |
num_workers=0 | |
) | |
with torch.no_grad(): | |
train_loss = calc_loss_loader(train_loader, model, device) | |
val_loss = calc_loss_loader(val_loader, model, device) | |
print("Training loss:", train_loss) | |
print("Validation loss:", val_loss) | |
# train model | |
torch.manual_seed(123) | |
model = GPTModel(GPT_CONFIG_124M) | |
model.to(device) | |
optimizer = torch.optim.AdamW( | |
model.parameters(), lr=0.0004, weight_decay=0.1 | |
) | |
num_epochs = 10 | |
train_losses, val_losses, tokens_seen = train_model_simple( | |
model, train_loader, val_loader, optimizer, device, | |
num_epochs=num_epochs, eval_freq=5, eval_iter=5, | |
start_context="Every effort moves you", tokenizer=tokenizer | |
) | |
torch.manual_seed(123) | |
token_ids = generate( | |
model=model, | |
idx=text_to_token_ids("Every effort moves you", tokenizer).to(device), | |
max_new_tokens=15, | |
context_size=GPT_CONFIG_124M["context_length"], | |
top_k=25, | |
temperature=1.4 | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
# save model (easy method) | |
torch.save(model.state_dict(), "model.pth") | |
# save model (advanced method) | |
torch.save({ | |
"model_state_dict": model.state_dict(), | |
"optimizer_state_dict": optimizer.state_dict(), | |
}, | |
"model_and_optimizer.pth" | |
) | |
exit() | |
# load model | |
if 1 == 0: | |
# load model (method #3) | |
CHOOSE_MODEL = "gpt2-small (124M)" | |
INPUT_PROMPT = "Every effort moves" | |
BASE_CONFIG = { | |
"vocab_size": 50257, | |
"context_length": 1024, | |
"drop_rate": 0.0, | |
"qkv_bias": True | |
} | |
model_configs = { | |
"gpt2-small (124M)": {"emb_dim": 768, "n_layers": 12, "n_heads": 12}, | |
"gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16}, | |
"gpt2-large (774M)": {"emb_dim": 1280, "n_layers": 36, "n_heads": 20}, | |
"gpt2-xl (1558M)": {"emb_dim": 1600, "n_layers": 48, "n_heads": 25}, | |
} | |
BASE_CONFIG.update(model_configs[CHOOSE_MODEL]) | |
model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")") | |
settings, params = download_and_load_gpt2( | |
model_size=model_size, models_dir="gpt2" | |
) | |
model = GPTModel(BASE_CONFIG) | |
load_weights_into_gpt(model, params) | |
model.eval() | |
# load model (easy method) | |
model = GPTModel(GPT_CONFIG_124M) | |
model.load_state_dict(torch.load("model.pth", map_location=device)) | |
model.to(device) | |
model.eval() | |
tokenizer = tiktoken.get_encoding("gpt2") | |
torch.manual_seed(123) | |
token_ids = generate( | |
model=model, | |
idx=text_to_token_ids("Every effort moves you", tokenizer).to(device), | |
max_new_tokens=15, | |
context_size=GPT_CONFIG_124M["context_length"], | |
top_k=25, | |
temperature=1.4 | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
# load model (advanced method) | |
checkpoint = torch.load("model_and_optimizer.pth", map_location=device) | |
model = GPTModel(GPT_CONFIG_124M) | |
model.load_state_dict(checkpoint["model_state_dict"]) | |
model.to(device) | |
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4, weight_decay=0.1) | |
optimizer.load_state_dict(checkpoint["optimizer_state_dict"]) | |
model.train(); | |
tokenizer = tiktoken.get_encoding("gpt2") | |
torch.manual_seed(123) | |
token_ids = generate( | |
model=model, | |
idx=text_to_token_ids("Every effort moves you", tokenizer).to(device), | |
max_new_tokens=15, | |
context_size=GPT_CONFIG_124M["context_length"], | |
top_k=25, | |
temperature=1.4 | |
) | |
print("Output text:\n", token_ids_to_text(token_ids, tokenizer)) | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment