|
""" |
|
PARALLEL LLM PRE-TRAINING |
|
========================= |
|
Compare single-process vs multi-process training performance |
|
""" |
|
|
|
import random |
|
import math |
|
import time |
|
from typing import List, Tuple, Dict |
|
from multiprocessing import Pool, cpu_count |
|
import os |
|
|
|
|
|
# ============================================================================ |
|
# PURE PYTHON LINEAR ALGEBRA (same as before) |
|
# ============================================================================ |
|
|
|
def zeros(shape: Tuple[int, ...]) -> List: |
|
"""Create a tensor of zeros.""" |
|
if len(shape) == 1: |
|
return [0.0] * shape[0] |
|
return [[0.0 for _ in range(shape[1])] for _ in range(shape[0])] |
|
|
|
|
|
def randn(shape: Tuple[int, ...], scale: float = 1.0) -> List: |
|
"""Create a tensor with random normal values.""" |
|
if len(shape) == 1: |
|
return [random.gauss(0, 1) * scale for _ in range(shape[0])] |
|
return [[random.gauss(0, 1) * scale for _ in range(shape[1])] for _ in range(shape[0])] |
|
|
|
|
|
def clip_gradient(grad: float, max_norm: float = 5.0) -> float: |
|
"""Clip gradient to prevent explosion.""" |
|
return max(min(grad, max_norm), -max_norm) |
|
|
|
|
|
def relu(x: List[float]) -> List[float]: |
|
"""ReLU activation.""" |
|
return [max(0.0, val) for val in x] |
|
|
|
|
|
def add_vectors(a: List[float], b: List[float]) -> List[float]: |
|
"""Element-wise vector addition.""" |
|
return [x + y for x, y in zip(a, b)] |
|
|
|
|
|
def softmax(x: List[float]) -> List[float]: |
|
"""Numerically stable softmax.""" |
|
max_x = max(x) |
|
exp_x = [math.exp(val - max_x) for val in x] |
|
sum_exp = sum(exp_x) |
|
return [val / sum_exp for val in exp_x] |
|
|
|
|
|
# ============================================================================ |
|
# CHARACTER TOKENIZER |
|
# ============================================================================ |
|
|
|
class SimpleCharTokenizer: |
|
"""Character-level tokenizer.""" |
|
|
|
def __init__(self, text: str): |
|
chars = sorted(list(set(text))) |
|
self.char_to_id = {ch: i for i, ch in enumerate(chars)} |
|
self.id_to_char = {i: ch for i, ch in enumerate(chars)} |
|
self.n_vocab = len(chars) |
|
|
|
def encode(self, text: str) -> List[int]: |
|
return [self.char_to_id.get(ch, 0) for ch in text] |
|
|
|
def decode(self, tokens: List[int]) -> str: |
|
return ''.join([self.id_to_char.get(t, '?') for t in tokens]) |
|
|
|
|
|
# ============================================================================ |
|
# NEURAL LANGUAGE MODEL |
|
# ============================================================================ |
|
|
|
class PurePythonLM: |
|
"""Simple feedforward language model.""" |
|
|
|
def __init__(self, vocab_size: int, embed_dim: int = 24, hidden_dim: int = 48): |
|
self.vocab_size = vocab_size |
|
self.embed_dim = embed_dim |
|
self.hidden_dim = hidden_dim |
|
|
|
# Xavier initialization |
|
scale_embed = math.sqrt(1.0 / vocab_size) |
|
scale_w1 = math.sqrt(2.0 / embed_dim) |
|
scale_w2 = math.sqrt(2.0 / hidden_dim) |
|
|
|
self.embeddings = randn((vocab_size, embed_dim), scale_embed) |
|
self.W1 = randn((embed_dim, hidden_dim), scale_w1) |
|
self.b1 = zeros((hidden_dim,)) |
|
self.W2 = randn((hidden_dim, vocab_size), scale_w2) |
|
self.b2 = zeros((vocab_size,)) |
|
|
|
self.cache = {} |
|
|
|
def get_params(self) -> Dict: |
|
"""Get model parameters as a dictionary.""" |
|
return { |
|
'embeddings': [row[:] for row in self.embeddings], |
|
'W1': [row[:] for row in self.W1], |
|
'b1': self.b1[:], |
|
'W2': [row[:] for row in self.W2], |
|
'b2': self.b2[:] |
|
} |
|
|
|
def set_params(self, params: Dict): |
|
"""Set model parameters from dictionary.""" |
|
self.embeddings = [row[:] for row in params['embeddings']] |
|
self.W1 = [row[:] for row in params['W1']] |
|
self.b1 = params['b1'][:] |
|
self.W2 = [row[:] for row in params['W2']] |
|
self.b2 = params['b2'][:] |
|
|
|
def forward(self, token_id: int) -> List[float]: |
|
"""Forward pass.""" |
|
embedding = self.embeddings[token_id][:] |
|
self.cache['embedding'] = embedding |
|
self.cache['token_id'] = token_id |
|
|
|
hidden_input = add_vectors( |
|
[sum(embedding[i] * self.W1[i][j] for i in range(self.embed_dim)) |
|
for j in range(self.hidden_dim)], |
|
self.b1 |
|
) |
|
self.cache['hidden_input'] = hidden_input |
|
hidden = relu(hidden_input) |
|
self.cache['hidden'] = hidden |
|
|
|
logits = add_vectors( |
|
[sum(hidden[i] * self.W2[i][j] for i in range(self.hidden_dim)) |
|
for j in range(self.vocab_size)], |
|
self.b2 |
|
) |
|
self.cache['logits'] = logits |
|
|
|
return logits |
|
|
|
def compute_loss(self, token_id: int, target: int) -> float: |
|
"""Compute cross-entropy loss.""" |
|
logits = self.forward(token_id) |
|
probs = softmax(logits) |
|
return -math.log(max(probs[target], 1e-10)) |
|
|
|
def compute_gradients(self, token_id: int, target: int) -> Dict: |
|
"""Compute gradients without updating parameters.""" |
|
# Forward pass |
|
logits = self.forward(token_id) |
|
|
|
embedding = self.cache['embedding'] |
|
hidden_input = self.cache['hidden_input'] |
|
hidden = self.cache['hidden'] |
|
|
|
# Backward pass |
|
probs = softmax(logits) |
|
dlogits = probs[:] |
|
dlogits[target] -= 1 |
|
|
|
# Gradients for W2 and b2 |
|
grad_W2 = zeros((self.hidden_dim, self.vocab_size)) |
|
for i in range(self.hidden_dim): |
|
for j in range(self.vocab_size): |
|
grad_W2[i][j] = hidden[i] * dlogits[j] |
|
|
|
grad_b2 = dlogits[:] |
|
|
|
# Gradient for hidden layer |
|
dhidden = [sum(dlogits[j] * self.W2[i][j] for j in range(self.vocab_size)) |
|
for i in range(self.hidden_dim)] |
|
dhidden_input = [dhidden[i] * (1.0 if hidden_input[i] > 0 else 0.0) |
|
for i in range(self.hidden_dim)] |
|
|
|
# Gradients for W1 and b1 |
|
grad_W1 = zeros((self.embed_dim, self.hidden_dim)) |
|
for i in range(self.embed_dim): |
|
for j in range(self.hidden_dim): |
|
grad_W1[i][j] = embedding[i] * dhidden_input[j] |
|
|
|
grad_b1 = dhidden_input[:] |
|
|
|
# Gradient for embeddings |
|
grad_embeddings = zeros((self.vocab_size, self.embed_dim)) |
|
dembedding = [sum(dhidden_input[j] * self.W1[i][j] for j in range(self.hidden_dim)) |
|
for i in range(self.embed_dim)] |
|
|
|
for i in range(self.embed_dim): |
|
grad_embeddings[token_id][i] = dembedding[i] |
|
|
|
return { |
|
'embeddings': grad_embeddings, |
|
'W1': grad_W1, |
|
'b1': grad_b1, |
|
'W2': grad_W2, |
|
'b2': grad_b2 |
|
} |
|
|
|
def apply_gradients(self, gradients: Dict, learning_rate: float): |
|
"""Apply gradients to update parameters.""" |
|
# Update embeddings |
|
for i in range(self.vocab_size): |
|
for j in range(self.embed_dim): |
|
grad = clip_gradient(gradients['embeddings'][i][j]) |
|
self.embeddings[i][j] -= learning_rate * grad |
|
|
|
# Update W1 |
|
for i in range(self.embed_dim): |
|
for j in range(self.hidden_dim): |
|
grad = clip_gradient(gradients['W1'][i][j]) |
|
self.W1[i][j] -= learning_rate * grad |
|
|
|
# Update b1 |
|
for j in range(self.hidden_dim): |
|
grad = clip_gradient(gradients['b1'][j]) |
|
self.b1[j] -= learning_rate * grad |
|
|
|
# Update W2 |
|
for i in range(self.hidden_dim): |
|
for j in range(self.vocab_size): |
|
grad = clip_gradient(gradients['W2'][i][j]) |
|
self.W2[i][j] -= learning_rate * grad |
|
|
|
# Update b2 |
|
for j in range(self.vocab_size): |
|
grad = clip_gradient(gradients['b2'][j]) |
|
self.b2[j] -= learning_rate * grad |
|
|
|
|
|
# ============================================================================ |
|
# TRAINING - SINGLE PROCESS |
|
# ============================================================================ |
|
|
|
def train_single_process(model: PurePythonLM, tokens: List[int], |
|
n_epochs: int, initial_lr: float) -> Tuple[List[float], float]: |
|
"""Train model in single process.""" |
|
losses = [] |
|
total_time = 0.0 |
|
|
|
for epoch in range(n_epochs): |
|
epoch_start = time.time() |
|
total_loss = 0.0 |
|
count = 0 |
|
|
|
# Learning rate schedule |
|
if epoch < 5: |
|
lr = initial_lr * (epoch + 1) / 5 |
|
else: |
|
lr = initial_lr * (0.95 ** ((epoch - 5) // 5)) |
|
|
|
# Train on consecutive token pairs |
|
for i in range(len(tokens) - 1): |
|
input_token = tokens[i] |
|
target_token = tokens[i + 1] |
|
|
|
loss = model.compute_loss(input_token, target_token) |
|
if loss > 100 or math.isnan(loss) or math.isinf(loss): |
|
continue |
|
|
|
total_loss += loss |
|
count += 1 |
|
|
|
# Compute and apply gradients |
|
grads = model.compute_gradients(input_token, target_token) |
|
model.apply_gradients(grads, lr) |
|
|
|
epoch_time = time.time() - epoch_start |
|
total_time += epoch_time |
|
|
|
avg_loss = total_loss / count if count > 0 else float('inf') |
|
losses.append(avg_loss) |
|
|
|
if (epoch + 1) % 10 == 0: |
|
print(f" Epoch {epoch + 1:3d}/{n_epochs} │ Loss: {avg_loss:.4f} │ Time: {epoch_time:.3f}s") |
|
|
|
return losses, total_time |
|
|
|
|
|
# ============================================================================ |
|
# TRAINING - MULTI PROCESS |
|
# ============================================================================ |
|
|
|
def compute_gradients_for_batch(args): |
|
"""Worker function to compute gradients for a batch of token pairs.""" |
|
params, token_pairs, vocab_size, embed_dim, hidden_dim = args |
|
|
|
# Create a temporary model |
|
model = PurePythonLM(vocab_size, embed_dim, hidden_dim) |
|
model.set_params(params) |
|
|
|
# Accumulate gradients |
|
accumulated_grads = { |
|
'embeddings': zeros((vocab_size, embed_dim)), |
|
'W1': zeros((embed_dim, hidden_dim)), |
|
'b1': zeros((hidden_dim,)), |
|
'W2': zeros((hidden_dim, vocab_size)), |
|
'b2': zeros((vocab_size,)) |
|
} |
|
|
|
total_loss = 0.0 |
|
count = 0 |
|
|
|
for input_token, target_token in token_pairs: |
|
try: |
|
loss = model.compute_loss(input_token, target_token) |
|
if loss > 100 or math.isnan(loss) or math.isinf(loss): |
|
continue |
|
|
|
total_loss += loss |
|
count += 1 |
|
|
|
grads = model.compute_gradients(input_token, target_token) |
|
|
|
# Accumulate gradients |
|
for key in accumulated_grads: |
|
if key in ['b1', 'b2']: # 1D arrays |
|
for i in range(len(accumulated_grads[key])): |
|
accumulated_grads[key][i] += grads[key][i] |
|
else: # 2D arrays |
|
for i in range(len(accumulated_grads[key])): |
|
for j in range(len(accumulated_grads[key][i])): |
|
accumulated_grads[key][i][j] += grads[key][i][j] |
|
except: |
|
continue |
|
|
|
return accumulated_grads, total_loss, count |
|
|
|
|
|
def train_multi_process(model: PurePythonLM, tokens: List[int], |
|
n_epochs: int, initial_lr: float, n_workers: int) -> Tuple[List[float], float]: |
|
"""Train model using multiple processes.""" |
|
losses = [] |
|
total_time = 0.0 |
|
|
|
# Create token pairs |
|
token_pairs = [(tokens[i], tokens[i + 1]) for i in range(len(tokens) - 1)] |
|
|
|
for epoch in range(n_epochs): |
|
epoch_start = time.time() |
|
|
|
# Learning rate schedule |
|
if epoch < 5: |
|
lr = initial_lr * (epoch + 1) / 5 |
|
else: |
|
lr = initial_lr * (0.95 ** ((epoch - 5) // 5)) |
|
|
|
# Split work among processes |
|
chunk_size = max(1, len(token_pairs) // n_workers) |
|
chunks = [token_pairs[i:i + chunk_size] for i in range(0, len(token_pairs), chunk_size)] |
|
|
|
# Prepare arguments for each worker |
|
params = model.get_params() |
|
worker_args = [ |
|
(params, chunk, model.vocab_size, model.embed_dim, model.hidden_dim) |
|
for chunk in chunks |
|
] |
|
|
|
# Compute gradients in parallel |
|
with Pool(processes=n_workers) as pool: |
|
results = pool.map(compute_gradients_for_batch, worker_args) |
|
|
|
# Aggregate gradients from all workers |
|
total_grads = { |
|
'embeddings': zeros((model.vocab_size, model.embed_dim)), |
|
'W1': zeros((model.embed_dim, model.hidden_dim)), |
|
'b1': zeros((model.hidden_dim,)), |
|
'W2': zeros((model.hidden_dim, model.vocab_size)), |
|
'b2': zeros((model.vocab_size,)) |
|
} |
|
|
|
total_loss = 0.0 |
|
total_count = 0 |
|
|
|
for grads, loss, count in results: |
|
total_loss += loss |
|
total_count += count |
|
|
|
for key in total_grads: |
|
if key in ['b1', 'b2']: # 1D arrays |
|
for i in range(len(total_grads[key])): |
|
total_grads[key][i] += grads[key][i] |
|
else: # 2D arrays |
|
for i in range(len(total_grads[key])): |
|
for j in range(len(total_grads[key][i])): |
|
total_grads[key][i][j] += grads[key][i][j] |
|
|
|
# Average gradients |
|
if total_count > 0: |
|
for key in total_grads: |
|
if key in ['b1', 'b2']: |
|
for i in range(len(total_grads[key])): |
|
total_grads[key][i] /= total_count |
|
else: |
|
for i in range(len(total_grads[key])): |
|
for j in range(len(total_grads[key][i])): |
|
total_grads[key][i][j] /= total_count |
|
|
|
# Apply gradients |
|
model.apply_gradients(total_grads, lr) |
|
|
|
epoch_time = time.time() - epoch_start |
|
total_time += epoch_time |
|
|
|
avg_loss = total_loss / total_count if total_count > 0 else float('inf') |
|
losses.append(avg_loss) |
|
|
|
if (epoch + 1) % 10 == 0: |
|
print(f" Epoch {epoch + 1:3d}/{n_epochs} │ Loss: {avg_loss:.4f} │ Time: {epoch_time:.3f}s") |
|
|
|
return losses, total_time |
|
|
|
|
|
# ============================================================================ |
|
# MAIN BENCHMARK |
|
# ============================================================================ |
|
|
|
def main(): |
|
print() |
|
print("╔" + "═" * 73 + "╗") |
|
print("║" + " " * 73 + "║") |
|
print("║" + " PARALLEL LLM PRE-TRAINING BENCHMARK".center(73) + "║") |
|
print("║" + " Single-Process vs Multi-Process Performance".center(73) + "║") |
|
print("║" + " " * 73 + "║") |
|
print("╚" + "═" * 73 + "╝") |
|
print() |
|
|
|
random.seed(42) |
|
|
|
# Training data |
|
training_text = """The quick brown fox jumps over the lazy dog. |
|
The dog was not amused by the fox's antics. |
|
A wise old owl lived in an oak tree. |
|
The owl saw and heard all that happened in the forest. |
|
The more the owl saw, the less it spoke. |
|
The less the owl spoke, the more it heard. |
|
Why can't we all be like that wise old bird? |
|
Once upon a time there was a curious cat. |
|
The cat loved to explore and discover new things. |
|
Every day the cat would go on adventures. |
|
The cat learned something new each day. |
|
Knowledge is power and learning never stops. |
|
Books are treasures filled with wisdom and stories. |
|
Reading opens doors to new worlds and ideas. |
|
Education is the key to a better future. |
|
The sun rises in the east and sets in the west. |
|
Nature follows patterns that we can observe and learn. |
|
Science helps us understand the world around us. |
|
Questions lead to answers and new questions.""" |
|
|
|
# Initialize tokenizer and data |
|
print("🔧 Setup") |
|
print("─" * 75) |
|
tokenizer = SimpleCharTokenizer(training_text) |
|
tokens = tokenizer.encode(training_text) |
|
|
|
n_cpus = cpu_count() |
|
vocab_size = tokenizer.n_vocab |
|
n_epochs = 50 |
|
initial_lr = 0.005 |
|
|
|
print(f" CPU cores available: {n_cpus}") |
|
print(f" Vocabulary size: {vocab_size}") |
|
print(f" Training tokens: {len(tokens):,}") |
|
print(f" Epochs: {n_epochs}") |
|
print() |
|
|
|
# ======================================================================== |
|
# BENCHMARK 1: Single Process |
|
# ======================================================================== |
|
print("🚀 BENCHMARK 1: Single Process Training") |
|
print("─" * 75) |
|
|
|
model_single = PurePythonLM(vocab_size=vocab_size, embed_dim=24, hidden_dim=48) |
|
n_params = (vocab_size * 24 + 24 * 48 + 48 + 48 * vocab_size + vocab_size) |
|
print(f" Model parameters: {n_params:,}") |
|
print() |
|
|
|
start_time = time.time() |
|
losses_single, train_time_single = train_single_process( |
|
model_single, tokens, n_epochs, initial_lr |
|
) |
|
total_time_single = time.time() - start_time |
|
|
|
print() |
|
print(f" ✓ Training complete!") |
|
print(f" ✓ Final loss: {losses_single[-1]:.4f}") |
|
print(f" ✓ Training time: {train_time_single:.2f}s") |
|
print(f" ✓ Total time (including overhead): {total_time_single:.2f}s") |
|
print(f" ✓ Throughput: {n_epochs / train_time_single:.2f} epochs/sec") |
|
print() |
|
|
|
# ======================================================================== |
|
# BENCHMARK 2: Multi Process |
|
# ======================================================================== |
|
print("🚀 BENCHMARK 2: Multi-Process Training") |
|
print("─" * 75) |
|
|
|
# Test with different numbers of workers |
|
for n_workers in [2, 4, n_cpus]: |
|
if n_workers > n_cpus: |
|
continue |
|
|
|
print(f"\n Testing with {n_workers} workers:") |
|
print(" " + "─" * 71) |
|
|
|
model_multi = PurePythonLM(vocab_size=vocab_size, embed_dim=24, hidden_dim=48) |
|
|
|
start_time = time.time() |
|
losses_multi, train_time_multi = train_multi_process( |
|
model_multi, tokens, n_epochs, initial_lr, n_workers |
|
) |
|
total_time_multi = time.time() - start_time |
|
|
|
print() |
|
print(f" ✓ Training complete!") |
|
print(f" ✓ Final loss: {losses_multi[-1]:.4f}") |
|
print(f" ✓ Training time: {train_time_multi:.2f}s") |
|
print(f" ✓ Total time (including overhead): {total_time_multi:.2f}s") |
|
print(f" ✓ Throughput: {n_epochs / train_time_multi:.2f} epochs/sec") |
|
|
|
# Compute speedup |
|
speedup = train_time_single / train_time_multi |
|
efficiency = (speedup / n_workers) * 100 |
|
|
|
print() |
|
print(f" 📊 Performance vs Single Process:") |
|
print(f" • Speedup: {speedup:.2f}x") |
|
print(f" • Parallel efficiency: {efficiency:.1f}%") |
|
|
|
if speedup < 1.0: |
|
print(f" ⚠ Slower than single process (overhead dominates)") |
|
elif speedup > 1.0: |
|
print(f" ✓ Faster than single process!") |
|
|
|
# ======================================================================== |
|
# SUMMARY |
|
# ======================================================================== |
|
print() |
|
print() |
|
print("╔" + "═" * 73 + "╗") |
|
print("║" + " BENCHMARK SUMMARY".center(73) + "║") |
|
print("╚" + "═" * 73 + "╝") |
|
print() |
|
print("📊 Key Findings:") |
|
print() |
|
print(" 1. Single-process training is straightforward and has low overhead") |
|
print() |
|
print(" 2. Multi-process training has significant overhead from:") |
|
print(" • Process creation and management") |
|
print(" • Serializing/deserializing model parameters") |
|
print(" • Aggregating gradients across processes") |
|
print() |
|
print(" 3. For small models and datasets (like this one):") |
|
print(" → Overhead often exceeds parallelization benefits") |
|
print(" → Single process may actually be faster!") |
|
print() |
|
print(" 4. Multi-process training shines when:") |
|
print(" → Models are very large") |
|
print(" → Datasets are massive") |
|
print(" → Batch sizes are huge") |
|
print(" → Computation >> communication overhead") |
|
print() |
|
print("💡 Takeaway: Choose parallelization strategy based on workload size!") |
|
print() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |