This file contains the notebooks (from 01_matmul.ipynb
to 14_augment.ipynb
) developed in the Practical Deep Learning for Coders - part 2 of fast.ai's 2022-23 course.
This document explores matrix multiplication, starting from basic Python implementations and progressing to more advanced techniques using NumPy, PyTorch, and CUDA.
The following libraries are required:
- Python standard library
- matplotlib
- NumPy
- PyTorch
- Numba (for CUDA implementation)
import pickle, gzip, math, os, time, shutil
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import torch
from numba import njit, cuda
from pathlib import Path
from urllib.request import urlretrieve
We'll use the MNIST dataset for our examples.
MNIST_URL = 'https://github.com/mnielsen/neural-networks-and-deep-learning/blob/master/data/mnist.pkl.gz?raw=true'
path_data = Path('data')
path_data.mkdir(exist_ok=True)
path_gz = path_data/'mnist.pkl.gz'
if not path_gz.exists():
urlretrieve(MNIST_URL, path_gz)
with gzip.open(path_gz, 'rb') as f:
((x_train, y_train), (x_valid, y_valid), _) = pickle.load(f, encoding='latin-1')
x_train, y_train, x_valid, y_valid = map(torch.tensor, (x_train, y_train, x_valid, y_valid))
Let's start with a simple implementation of matrix multiplication:
def matmul(a, b):
ar, ac = a.shape
br, bc = b.shape
c = torch.zeros(ar, bc)
for i in range(ar):
for j in range(bc):
for k in range(ac):
c[i, j] += a[i, k] * b[k, j]
return c
This implementation is correct but slow. We'll improve it step by step.
@njit
def dot(a, b):
res = 0.
for i in range(len(a)):
res += a[i] * b[i]
return res
def matmul_dot(a, b):
ar, ac = a.shape
br, bc = b.shape
c = torch.zeros(ar, bc)
for i in range(ar):
for j in range(bc):
c[i, j] = dot(a[i, :], b[:, j])
return c
def matmul_broadcast(a, b):
ar, ac = a.shape
br, bc = b.shape
c = torch.zeros(ar, bc)
for i in range(ar):
c[i] = (a[i, :, None] * b).sum(dim=0)
return c
def matmul_einsum(a, b):
return torch.einsum('ik,kj->ij', a, b)
For even faster matrix multiplication on NVIDIA GPUs, we can use CUDA:
@cuda.jit
def matmul_cuda(a, b, c):
i, j = cuda.grid(2)
if i < c.shape[0] and j < c.shape[1]:
tmp = 0.
for k in range(a.shape[1]):
tmp += a[i, k] * b[k, j]
c[i, j] = tmp
# Usage:
TPB = 16
rr, rc = result.shape
blockspergrid = (math.ceil(rr / TPB), math.ceil(rc / TPB))
matmul_cuda[blockspergrid, (TPB, TPB)](a_gpu, b_gpu, c_gpu)
Here's a comparison of the different methods:
- Basic implementation: ~421 ms
- Dot product: ~236 μs
- Broadcasting: ~70.1 μs
- Einstein summation: ~15.1 ms
- PyTorch built-in: ~15.2 ms
- CUDA implementation: ~3.61 ms
The CUDA implementation provides the best performance, with a speedup of about 5 million times compared to the basic implementation.
We've explored various methods of implementing matrix multiplication, from basic Python loops to highly optimized CUDA kernels. Each method offers a trade-off between simplicity and performance. For most practical applications, using built-in functions from libraries like NumPy or PyTorch will provide excellent performance. However, understanding these implementations helps in optimizing code for specific use cases and hardware.
Alt text: A visual representation of matrix multiplication, showing how each element in the resulting matrix is calculated from the rows of the first matrix and columns of the second matrix.
This document explores the implementation of mean shift clustering, starting with a basic Python version and progressing to a GPU-accelerated version.
We start by importing necessary libraries and generating sample data:
import math, matplotlib.pyplot as plt, operator, torch
from functools import partial
torch.manual_seed(42)
torch.set_printoptions(precision=3, linewidth=140, sci_mode=False)
# Generate sample data
n_clusters = 6
n_samples = 250
centroids = torch.rand(n_clusters, 2) * 70 - 35
from torch.distributions.multivariate_normal import MultivariateNormal
from torch import tensor
def sample(m):
return MultivariateNormal(m, torch.diag(tensor([5., 5.]))).sample((n_samples,))
slices = [sample(c) for c in centroids]
data = torch.cat(slices)
The basic mean shift algorithm is implemented as follows:
def gaussian(d, bw):
return torch.exp(-0.5*((d/bw))**2) / (bw*math.sqrt(2*math.pi))
def one_update(X):
for i, x in enumerate(X):
dist = torch.sqrt(((x-X)**2).sum(1))
weight = gaussian(dist, 2.5)
X[i] = (weight[:,None]*X).sum(0)/weight.sum()
def meanshift(data):
X = data.clone()
for it in range(5):
one_update(X)
return X
To accelerate the algorithm, we implement a batched version that can run on GPU:
def dist_b(a, b):
return (((a[None]-b[:,None])**2).sum(2)).sqrt()
def meanshift(data, bs=500):
n = len(data)
X = data.clone()
for it in range(5):
for i in range(0, n, bs):
s = slice(i, min(i+bs,n))
weight = gaussian(dist_b(X, X[s]), 2.5)
div = weight.sum(1, keepdim=True)
X[s] = weight@X/div
return X
# Move data to GPU
data = data.cuda()
# Run GPU-accelerated mean shift
X = meanshift(data).cpu()
The GPU-accelerated version shows significant speedup:
%timeit -n 5 _=meanshift(data, 1250).cpu()
# Output: 2 ms ± 226 µs per loop (mean ± std. dev. of 7 runs, 5 loops each)
This is much faster than the non-batched CPU version.
We can visualize the results using matplotlib:
def plot_data(centroids, data, n_samples, ax=None):
if ax is None:
_, ax = plt.subplots()
for i, centroid in enumerate(centroids):
samples = data[i*n_samples:(i+1)*n_samples]
ax.scatter(samples[:,0], samples[:,1], s=1)
ax.plot(*centroid, markersize=10, marker="x", color='k', mew=5)
ax.plot(*centroid, markersize=5, marker="x", color='m', mew=2)
plot_data(centroids+2, X, n_samples)
plt.show()
Mean shift clustering is an effective algorithm for discovering clusters in data without specifying the number of clusters in advance. By implementing a batched version that runs on GPU, we can significantly accelerate the computation, making it feasible for larger datasets.
This notebook demonstrates the implementation of a simple neural network, including forward and backward passes, using PyTorch. We'll start with a basic implementation and gradually refine it.
First, let's import the necessary libraries and load the MNIST dataset:
import pickle, gzip, math, os, time, shutil, torch, matplotlib as mpl, numpy as np
from pathlib import Path
from torch import tensor
from fastcore.test import test_close
torch.manual_seed(42)
mpl.rcParams['image.cmap'] = 'gray'
torch.set_printoptions(precision=2, linewidth=125, sci_mode=False)
np.set_printoptions(precision=2, linewidth=125)
# Load MNIST data
path_data = Path('data')
path_gz = path_data/'mnist.pkl.gz'
with gzip.open(path_gz, 'rb') as f:
((x_train, y_train), (x_valid, y_valid), _) = pickle.load(f, encoding='latin-1')
x_train, y_train, x_valid, y_valid = map(tensor, [x_train, y_train, x_valid, y_valid])
Let's define a simple neural network architecture:
n, m = x_train.shape
c = y_train.max() + 1
nh = 50 # number of hidden units
w1 = torch.randn(m, nh)
b1 = torch.zeros(nh)
w2 = torch.randn(nh, 1)
b2 = torch.zeros(1)
def lin(x, w, b): return x@w + b
def relu(x): return x.clamp_min(0.)
def model(xb):
l1 = lin(xb, w1, b1)
l2 = relu(l1)
return lin(l2, w2, b2)
We'll use Mean Squared Error as our loss function initially:
def mse(output, targ): return (output[:,0]-targ).pow(2).mean()
Let's implement the backward pass manually:
def lin_grad(inp, out, w, b):
inp.g = out.g @ w.t()
w.g = (inp.unsqueeze(-1) * out.g.unsqueeze(1)).sum(0)
b.g = out.g.sum(0)
def forward_and_backward(inp, targ):
# forward pass:
l1 = lin(inp, w1, b1)
l2 = relu(l1)
out = lin(l2, w2, b2)
diff = out[:,0]-targ
loss = diff.pow(2).mean()
# backward pass:
out.g = 2.*diff[:,None] / inp.shape[0]
lin_grad(l2, out, w2, b2)
l1.g = (l1>0).float() * l2.g
lin_grad(inp, l1, w1, b1)
We can refactor our model to use classes for each layer:
class Relu():
def __call__(self, inp):
self.inp = inp
self.out = inp.clamp_min(0.)
return self.out
def backward(self): self.inp.g = (self.inp>0).float() * self.out.g
class Lin():
def __init__(self, w, b): self.w,self.b = w,b
def __call__(self, inp):
self.inp = inp
self.out = lin(inp, self.w, self.b)
return self.out
def backward(self):
self.inp.g = self.out.g @ self.w.t()
self.w.g = self.inp.t() @ self.out.g
self.b.g = self.out.g.sum(0)
class Mse():
def __call__(self, inp, targ):
self.inp,self.targ = inp,targ
self.out = mse(inp, targ)
return self.out
def backward(self):
self.inp.g = 2. * (self.inp.squeeze() - self.targ).unsqueeze(-1) / self.targ.shape[0]
class Model():
def __init__(self, w1, b1, w2, b2):
self.layers = [Lin(w1,b1), Relu(), Lin(w2,b2)]
self.loss = Mse()
def __call__(self, x, targ):
for l in self.layers: x = l(x)
return self.loss(x, targ)
def backward(self):
self.loss.backward()
for l in reversed(self.layers): l.backward()
Finally, we can leverage PyTorch's nn.Module
for a more standard implementation:
from torch import nn
import torch.nn.functional as F
class Linear(nn.Module):
def __init__(self, n_in, n_out):
super().__init__()
self.w = torch.randn(n_in,n_out).requires_grad_()
self.b = torch.zeros(n_out).requires_grad_()
def forward(self, inp): return inp@self.w + self.b
class Model(nn.Module):
def __init__(self, n_in, nh, n_out):
super().__init__()
self.layers = [Linear(n_in,nh), nn.ReLU(), Linear(nh,n_out)]
def __call__(self, x, targ):
for l in self.layers: x = l(x)
return F.mse_loss(x, targ[:,None])
model = Model(m, nh, 1)
loss = model(x_train, y_train)
loss.backward()
This implementation demonstrates the core concepts of building a neural network from scratch, including forward and backward passes, and shows how to transition to using PyTorch's built-in modules for more efficient development.
This notebook demonstrates the implementation of minibatch training for neural networks using PyTorch. We'll cover the following topics:
- Initial setup and data preparation
- Basic model architecture
- Loss functions (Cross-entropy)
- Training loop implementation
- Dataset and DataLoader
- Optimization techniques
- Validation
import pickle, gzip, math, os, time, shutil, torch, matplotlib as mpl, numpy as np, matplotlib.pyplot as plt
from pathlib import Path
from torch import tensor, nn
import torch.nn.functional as F
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'
# Load MNIST data
path_data = Path('data')
path_gz = path_data/'mnist.pkl.gz'
with gzip.open(path_gz, 'rb') as f:
((x_train, y_train), (x_valid, y_valid), _) = pickle.load(f, encoding='latin-1')
x_train, y_train, x_valid, y_valid = map(tensor, [x_train, y_train, x_valid, y_valid])
n, m = x_train.shape
c = y_train.max() + 1
nh = 50 # number of hidden units
We'll use a simple feed-forward neural network with one hidden layer:
class Model(nn.Module):
def __init__(self, n_in, nh, n_out):
super().__init__()
self.layers = [nn.Linear(n_in, nh), nn.ReLU(), nn.Linear(nh, n_out)]
def __call__(self, x):
for l in self.layers: x = l(x)
return x
model = Model(m, nh, 10)
We'll implement the log softmax and negative log-likelihood loss:
def log_softmax(x):
return x - x.logsumexp(-1, keepdim=True)
def nll(input, target):
return -input[range(target.shape[0]), target].mean()
loss_func = F.cross_entropy # PyTorch's built-in implementation
bs = 50 # batch size
lr = 0.5 # learning rate
epochs = 3 # number of epochs
def accuracy(out, yb):
return (out.argmax(dim=1) == yb).float().mean()
def report(loss, preds, yb):
print(f'{loss:.2f}, {accuracy(preds, yb):.2f}')
def get_model():
model = nn.Sequential(nn.Linear(m, nh), nn.ReLU(), nn.Linear(nh, 10))
return model, optim.SGD(model.parameters(), lr=lr)
model, opt = get_model()
for epoch in range(epochs):
for i in range(0, n, bs):
s = slice(i, min(n, i+bs))
xb, yb = x_train[s], y_train[s]
preds = model(xb)
loss = loss_func(preds, yb)
loss.backward()
opt.step()
opt.zero_grad()
report(loss, preds, yb)
We'll implement custom Dataset and DataLoader classes:
class Dataset:
def __init__(self, x, y): self.x, self.y = x, y
def __len__(self): return len(self.x)
def __getitem__(self, i): return self.x[i], self.y[i]
train_ds, valid_ds = Dataset(x_train, y_train), Dataset(x_valid, y_valid)
class DataLoader:
def __init__(self, ds, bs): self.ds, self.bs = ds, bs
def __iter__(self):
for i in range(0, len(self.ds), self.bs): yield self.ds[i:i+self.bs]
train_dl = DataLoader(train_ds, bs)
valid_dl = DataLoader(valid_ds, bs)
To improve training, we'll implement random sampling:
import random
class Sampler:
def __init__(self, ds, shuffle=False):
self.n, self.shuffle = len(ds), shuffle
def __iter__(self):
res = list(range(self.n))
if self.shuffle: random.shuffle(res)
return iter(res)
class BatchSampler:
def __init__(self, sampler, bs, drop_last=False):
self.sampler, self.bs, self.drop_last = sampler, bs, drop_last
def __iter__(self):
batch = []
for idx in self.sampler:
batch.append(idx)
if len(batch) == self.bs:
yield batch
batch = []
if batch and not self.drop_last:
yield batch
def collate(b):
xs, ys = zip(*b)
return torch.stack(xs), torch.stack(ys)
class DataLoader:
def __init__(self, ds, batchs, collate_fn=collate):
self.ds, self.batchs, self.collate_fn = ds, batchs, collate_fn
def __iter__(self):
for b in self.batchs:
yield self.collate_fn([self.ds[i] for i in b])
train_samp = BatchSampler(Sampler(train_ds, shuffle=True), bs)
valid_samp = BatchSampler(Sampler(valid_ds, shuffle=False), bs)
train_dl = DataLoader(train_ds, batchs=train_samp)
valid_dl = DataLoader(valid_ds, batchs=valid_samp)
We can also use PyTorch's built-in DataLoader:
from torch.utils.data import DataLoader, SequentialSampler, RandomSampler, BatchSampler
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True, drop_last=True, num_workers=2)
valid_dl = DataLoader(valid_ds, batch_size=bs, shuffle=False, num_workers=2)
We'll implement a validation step to monitor overfitting:
def fit(epochs, model, loss_func, opt, train_dl, valid_dl):
for epoch in range(epochs):
model.train()
for xb, yb in train_dl:
loss = loss_func(model(xb), yb)
loss.backward()
opt.step()
opt.zero_grad()
model.eval()
with torch.no_grad():
tot_loss, tot_acc, count = 0., 0., 0
for xb, yb in valid_dl:
pred = model(xb)
n = len(xb)
count += n
tot_loss += loss_func(pred, yb).item() * n
tot_acc += accuracy(pred, yb).item() * n
print(epoch, tot_loss/count, tot_acc/count)
return tot_loss/count, tot_acc/count
def get_dls(train_ds, valid_ds, bs, **kwargs):
return (DataLoader(train_ds, batch_size=bs, shuffle=True, **kwargs),
DataLoader(valid_ds, batch_size=bs*2, **kwargs))
train_dl, valid_dl = get_dls(train_ds, valid_ds, bs)
model, opt = get_model()
loss, acc = fit(5, model, loss_func, opt, train_dl, valid_dl)
This notebook demonstrates the key components of training neural networks using minibatches, including data loading, model architecture, loss functions, and validation. By implementing these concepts from scratch and then using PyTorch's built-in functionality, we gain a deeper understanding of the training process.
#| default_exp datasets
#|export
from __future__ import annotations
import math, numpy as np, matplotlib.pyplot as plt
from operator import itemgetter
from itertools import zip_longest
import fastcore.all as fc
from torch.utils.data import default_collate
from miniai.training import *
import logging, pickle, gzip, os, time, shutil, torch, matplotlib as mpl
from pathlib import Path
from torch import tensor, nn, optim
from torch.utils.data import DataLoader
import torch.nn.functional as F
from datasets import load_dataset, load_dataset_builder
import torchvision.transforms.functional as TF
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'
logging.disable(logging.WARNING)
name = "fashion_mnist"
ds_builder = load_dataset_builder(name)
print(ds_builder.info.description)
Fashion-MNIST is a dataset of Zalando's article images—consisting of a training set of
60,000 examples and a test set of 10,000 examples. Each example is a 28x28 grayscale image,
associated with a label from 10 classes. We intend Fashion-MNIST to serve as a direct drop-in
replacement for the original MNIST dataset for benchmarking machine learning algorithms.
It shares the same image size and structure of training and testing splits.
ds_builder.info.features
ds_builder.info.splits
dsd = load_dataset(name)
dsd
train, test = dsd['train'], dsd['test']
train[0]
x, y = ds_builder.info.features
x, y
x, y = 'image', 'label'
img = train[0][x]
img
xb = train[:5][x]
yb = train[:5][y]
yb
featy = train.features[y]
featy
featy.int2str(yb)
train['label'][:5]
def collate_fn(b):
return {x: torch.stack([TF.to_tensor(o[x]) for o in b]),
y: tensor([o[y] for o in b])}
dl = DataLoader(train, collate_fn=collate_fn, batch_size=16)
b = next(iter(dl))
b[x].shape, b[y]
def transforms(b):
b[x] = [TF.to_tensor(o) for o in b[x]]
return b
tds = train.with_transform(transforms)
dl = DataLoader(tds, batch_size=16)
b = next(iter(dl))
b[x].shape, b[y]
def _transformi(b): b[x] = [torch.flatten(TF.to_tensor(o)) for o in b[x]]
#|export
def inplace(f):
def _f(b):
f(b)
return b
return _f
transformi = inplace(_transformi)
r = train.with_transform(transformi)[0]
r[x].shape, r[y]
@inplace
def transformi(b): b[x] = [torch.flatten(TF.to_tensor(o)) for o in b[x]]
tdsf = train.with_transform(transformi)
r = tdsf[0]
r[x].shape, r[y]
d = dict(a=1, b=2, c=3)
ig = itemgetter('a', 'c')
ig(d)
class D:
def __getitem__(self, k): return 1 if k=='a' else 2 if k=='b' else 3
d = D()
ig(d)
list(tdsf.features)
batch = dict(a=[1], b=[2]), dict(a=[3], b=[4])
default_collate(batch)
#|export
def collate_dict(ds):
get = itemgetter(*ds.features)
def _f(b): return get(default_collate(b))
return _f
dlf = DataLoader(tdsf, batch_size=4, collate_fn=collate_dict(tdsf))
xb, yb = next(iter(dlf))
xb.shape, yb
b = next(iter(dl))
xb = b['image']
img = xb[0]
plt.imshow(img[0]);
#|export
@fc.delegates(plt.Axes.imshow)
def show_image(im, ax=None, figsize=None, title=None, noframe=True, **kwargs):
"Show a PIL or PyTorch image on `ax`."
if fc.hasattrs(im, ('cpu', 'permute', 'detach')):
im = im.detach().cpu()
if len(im.shape)==3 and im.shape[0]<5: im=im.permute(1,2,0)
elif not isinstance(im, np.ndarray): im=np.array(im)
if im.shape[-1]==1: im=im[...,0]
if ax is None: _, ax = plt.subplots(figsize=figsize)
ax.imshow(im, **kwargs)
if title is not None: ax.set_title(title)
ax.set_xticks([])
ax.set_yticks([])
if noframe: ax.axis('off')
return ax
show_image(img, figsize=(2,2));
fig, axs = plt.subplots(1,2)
show_image(img, axs[0])
show_image(xb[1], axs[1]);
#|export
@fc.delegates(plt.subplots, keep=True)
def subplots(
nrows:int=1, # Number of rows in returned axes grid
ncols:int=1, # Number of columns in returned axes grid
figsize:tuple=None, # Width, height in inches of the returned figure
imsize:int=3, # Size (in inches) of images that will be displayed in the returned figure
suptitle:str=None, # Title to be set to returned figure
**kwargs
): # fig and axs
"A figure and set of subplots to display images of `imsize` inches"
if figsize is None: figsize=(ncols*imsize, nrows*imsize)
fig, ax = plt.subplots(nrows, ncols, figsize=figsize, **kwargs)
if suptitle is not None: fig.suptitle(suptitle)
if nrows*ncols==1: ax = np.array([ax])
return fig, ax
fig, axs = subplots(3,3, imsize=1)
imgs = xb[:8]
for ax, img in zip(axs.flat, imgs): show_image(img, ax)
#|export
@fc.delegates(subplots)
def get_grid(
n:int, # Number of axes
nrows:int=None, # Number of rows, defaulting to `int(math.sqrt(n))`
ncols:int=None, # Number of columns, defaulting to `ceil(n/rows)`
title:str=None, # If passed, title set to the figure
weight:str='bold', # Title font weight
size:int=14, # Title font size
**kwargs,
): # fig and axs
"Return a grid of `n` axes, `rows` by `cols`"
if nrows: ncols = ncols or int(np.floor(n/nrows))
elif ncols: nrows = nrows or int(np.ceil(n/ncols))
else:
nrows = int(math.sqrt(n))
ncols = int(np.floor(n/nrows))
fig, axs = subplots(nrows, ncols, **kwargs)
for i in range(n, nrows*ncols): axs.flat[i].set_axis_off()
if title is not None: fig.suptitle(title, weight=weight, size=size)
return fig, axs
fig, axs = get_grid(8, nrows=3, imsize=1)
for ax, img in zip(axs.flat, imgs): show_image(img, ax)
#|export
@fc.delegates(subplots)
def show_images(ims:list, # Images to show
nrows:int|None=None, # Number of rows in grid
ncols:int|None=None, # Number of columns in grid (auto-calculated if None)
titles:list|None=None, # Optional list of titles for each image
**kwargs):
"Show all images `ims` as subplots with `rows` using `titles`"
axs = get_grid(len(ims), nrows, ncols, **kwargs)[1].flat
for im, t, ax in zip_longest(ims, titles or [], axs): show_image(im, ax=ax, title=t)
yb = b['label']
lbls = yb[:8]
names = "Top Trouser Pullover Dress Coat Sandal Shirt Sneaker Bag Boot".split()
titles = itemgetter(*lbls)(names)
' '.join(titles)
show_images(imgs, imsize=1.7, titles=titles)
#|export
class DataLoaders:
def __init__(self, *dls): self.train, self.valid = dls[:2]
@classmethod
def from_dd(cls, dd, batch_size, as_tuple=True, **kwargs):
f = collate_dict(dd['train'])
return cls(*get_dls(*dd.values(), bs=batch_size, collate_fn=f, **kwargs))
import nbdev; nbdev.nbdev_export()
import torch
import matplotlib.pyplot as plt
import random
import ipywidgets as widgets
From the ipywidget docs:
- the button widget is used to handle mouse clicks. The on_click method of the Button can be used to register function to be called when the button is clicked
w = widgets.Button(description='Click me')
w
def f(o): print('hi')
w.on_click(f)
NB: When callbacks are used in this way they are often called "events".
from time import sleep
def slow_calculation():
res = 0
for i in range(5):
res += i*i
sleep(1)
return res
slow_calculation()
def slow_calculation(cb=None):
res = 0
for i in range(5):
res += i*i
sleep(1)
if cb: cb(i)
return res
def show_progress(epoch): print(f"Awesome! We've finished epoch {epoch}!")
slow_calculation(show_progress)
slow_calculation(lambda o: print(f"Awesome! We've finished epoch {o}!"))
def show_progress(exclamation, epoch): print(f"{exclamation}! We've finished epoch {epoch}!")
slow_calculation(lambda o: show_progress("OK I guess", o))
def make_show_progress(exclamation):
def _inner(epoch): print(f"{exclamation}! We've finished epoch {epoch}!")
return _inner
slow_calculation(make_show_progress("Nice!"))
from functools import partial
slow_calculation(partial(show_progress, "OK I guess"))
f2 = partial(show_progress, "OK I guess")
class ProgressShowingCallback():
def __init__(self, exclamation="Awesome"): self.exclamation = exclamation
def __call__(self, epoch): print(f"{self.exclamation}! We've finished epoch {epoch}!")
cb = ProgressShowingCallback("Just super")
slow_calculation(cb)
def f(*a, **b): print(f"args: {a}; kwargs: {b}")
f(3, 'a', thing1="hello")
def g(a,b,c=0): print(a,b,c)
args = [1,2]
kwargs = {'c':3}
g(*args, **kwargs)
def slow_calculation(cb=None):
res = 0
for i in range(5):
if cb: cb.before_calc(i)
res += i*i
sleep(1)
if cb: cb.after_calc(i, val=res)
return res
class PrintStepCallback():
def before_calc(self, *args, **kwargs): print(f"About to start")
def after_calc (self, *args, **kwargs): print(f"Done step")
slow_calculation(PrintStepCallback())
class PrintStatusCallback():
def __init__(self): pass
def before_calc(self, epoch, **kwargs): print(f"About to start: {epoch}")
def after_calc (self, epoch, val, **kwargs): print(f"After {epoch}: {val}")
slow_calculation(PrintStatusCallback())
def slow_calculation(cb=None):
res = 0
for i in range(5):
if cb and hasattr(cb,'before_calc'): cb.before_calc(i)
res += i*i
sleep(1)
if cb and hasattr(cb,'after_calc'):
if cb.after_calc(i, res):
print("stopping early")
break
return res
class PrintAfterCallback():
def after_calc (self, epoch, val):
print(f"After {epoch}: {val}")
if val>10: return True
slow_calculation(PrintAfterCallback())
class SlowCalculator():
def __init__(self, cb=None): self.cb,self.res = cb,0
def callback(self, cb_name, *args):
if not self.cb: return
cb = getattr(self.cb,cb_name, None)
if cb: return cb(self, *args)
def calc(self):
for i in range(5):
self.callback('before_calc', i)
self.res += i*i
sleep(1)
if self.callback('after_calc', i):
print("stopping early")
break
class ModifyingCallback():
def after_calc (self, calc, epoch):
print(f"After {epoch}: {calc.res}")
if calc.res>10: return True
if calc.res<3: calc.res = calc.res*2
calculator = SlowCalculator(ModifyingCallback())
calculator.calc()
calculator.res
Anything that looks like __this__
is, in some way, special. Python, or some library, can define some functions that they will call at certain documented times. For instance, when your class is setting up a new object, python will call __init__
. These are defined as part of the python data model.
For instance, if python sees +
, then it will call the special method __add__
. If you try to display an object in Jupyter (or lots of other places in Python) it will call __repr__
.
class SloppyAdder():
def __init__(self,o): self.o=o
def __add__(self,b): return SloppyAdder(self.o + b.o + 0.01)
def __repr__(self): return str(self.o)
a = SloppyAdder(1)
b = SloppyAdder(2)
a+b
Special methods you should probably know about (see data model link above) are:
__getitem__
__getattr__
__setattr__
__del__
__init__
__new__
__enter__
__exit__
__len__
__repr__
__str__
class A: a,b=1,2
a = A()
a.b
getattr(a, 'b')
getattr(a, 'b' if random.random()>0.5 else 'a')
class B:
a,b=1,2
def __getattr__(self, k):
if k[0]=='_': raise AttributeError(k)
return f'Hello from {k}'
b = B()
b.a
b.foo
#|default_exp conv
#|export
import torch
from torch import nn
from torch.utils.data import default_collate
from typing import Mapping
from miniai.training import *
from miniai.datasets import *
import pickle,gzip,math,os,time,shutil,torch,matplotlib as mpl, numpy as np
import pandas as pd,matplotlib.pyplot as plt
from pathlib import Path
from torch import tensor
from torch.utils.data import DataLoader
from typing import Mapping
mpl.rcParams['image.cmap'] = 'gray'
path_data = Path('data')
path_gz = path_data/'mnist.pkl.gz'
with gzip.open(path_gz, 'rb') as f: ((x_train, y_train), (x_valid, y_valid), _) = pickle.load(f, encoding='latin-1')
x_train, y_train, x_valid, y_valid = map(tensor, [x_train, y_train, x_valid, y_valid])
In the context of an image, a feature is a visually distinctive attribute. For example, the number 7 is characterized by a horizontal edge near the top of the digit, and a top-right to bottom-left diagonal edge underneath that.
It turns out that finding the edges in an image is a very common task in computer vision, and is surprisingly straightforward. To do it, we use a convolution. A convolution requires nothing more than multiplication, and addition.
To explain the math behind convolutions, fast.ai student Matt Kleinsmith came up with the very clever idea of showing CNNs from different viewpoints.
Here's the input:
Here's our kernel:
Since the filter fits in the image four times, we have four results:
x_imgs = x_train.view(-1,28,28)
xv_imgs = x_valid.view(-1,28,28)
mpl.rcParams['figure.dpi'] = 30
im3 = x_imgs[7]
show_image(im3);
top_edge = tensor([[-1,-1,-1],
[ 0, 0, 0],
[ 1, 1, 1]]).float()
We're going to call this our kernel (because that's what fancy computer vision researchers call these).
show_image(top_edge, noframe=False);
The filter will take any window of size 3×3 in our images, and if we name the pixel values like this:
it will return
df = pd.DataFrame(im3[:13,:23])
df.style.format(precision=2).set_properties(**{'font-size':'7pt'}).background_gradient('Greys')
[DataFrame output omitted for brevity]
(im3[3:6,14:17] * top_edge).sum()
tensor(2.9727)
(im3[7:10,14:17] * top_edge).sum()
tensor(-2.9570)
def apply_kernel(row, col, kernel): return (im3[row-1:row+2,col-1:col+2] * kernel).sum()
apply_kernel(4,15,top_edge)
tensor(2.9727)
[[(i,j) for j in range(5)] for i in range(5)]
rng = range(1,27)
top_edge3 = tensor([[apply_kernel(i,j,top_edge) for j in rng] for i in rng])
show_image(top_edge3);
left_edge = tensor([[-1,0,1],
[-1,0,1],
[-1,0,1]]).float()
show_image(left_edge, noframe=False);
left_edge3 = tensor([[apply_kernel(i,j,left_edge) for j in rng] for i in rng])
show_image(left_edge3);
import torch.nn.functional as F
import torch
What to do if you have 2 months to complete your thesis? Use im2col.
Here's a sample numpy implementation.
inp = im3[None,None,:,:].float()
inp_unf = F.unfold(inp, (3,3))[0]
inp_unf.shape
torch.Size([9, 676])
w = left_edge.view(-1)
w.shape
torch.Size([9])
out_unf = w@inp_unf
out_unf.shape
torch.Size([676])
out = out_unf.view(26,26)
show_image(out);
%timeit -n 1 tensor([[apply_kernel(i,j,left_edge) for j in rng] for i in rng]);
7.14 ms ± 150 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit -n 100 (w@F.unfold(inp, (3,3))[0]).view(26,26);
27.2 µs ± 1.51 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit -n 100 F.conv2d(inp, left_edge[None,None])
15.7 µs ± 1.06 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
diag1_edge = tensor([[ 0,-1, 1],
[-1, 1, 0],
[ 1, 0, 0]]).float()
show_image(diag1_edge, noframe=False);
diag2_edge = tensor([[ 1,-1, 0],
[ 0, 1,-1],
[ 0, 0, 1]]).float()
show_image(diag2_edge, noframe=False);
xb = x_imgs[:16][:,None]
xb.shape
torch.Size([16, 1, 28, 28])
edge_kernels = torch.stack([left_edge, top_edge, diag1_edge, diag2_edge])[:,None]
edge_kernels.shape
torch.Size([4, 1, 3, 3])
batch_features = F.conv2d(xb, edge_kernels)
batch_features.shape
torch.Size([16, 4, 26, 26])
The output shape shows we gave 64 images in the mini-batch, 4 kernels, and 26×26 edge maps (we started with 28×28 images, but lost one pixel from each side as discussed earlier). We can see we get the same results as when we did this manually:
img0 = xb[1,0]
show_image(img0);
show_images([batch_features[1,i] for i in range(4)])
With appropriate padding, we can ensure that the output activation map is the same size as the original image.
With a 5×5 input, 4×4 kernel, and 2 pixels of padding, we end up with a 6×6 activation map.
If we add a kernel of size ks
by ks
(with ks
an odd number), the necessary padding on each side to keep the same shape is ks//2
.
We could move over two pixels after each kernel application. This is known as a stride-2 convolution.
n,m = x_train.shape
c = y_train.max()+1
nh = 50
model = nn.Sequential(nn.Linear(m,nh), nn.ReLU(), nn.Linear(nh,10))
broken_cnn = nn.Sequential(
nn.Conv2d(1,30, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(30,10, kernel_size=3, padding=1)
)
broken_cnn(xb).shape
torch.Size([16, 10, 28, 28])
#|export
def conv(ni, nf, ks=3, stride=2, act=True):
res = nn.Conv2d(ni, nf, stride=stride, kernel_size=ks, padding=ks//2)
if act: res = nn.Sequential(res, nn.ReLU())
return res
Refactoring parts of your neural networks like this makes it much less likely you'll get errors due to inconsistencies in your architectures, and makes it more obvious to the reader which parts of your layers are actually changing.
simple_cnn = nn.Sequential(
conv(1 ,4), #14x14
conv(4 ,8), #7x7
conv(8 ,16), #4x4
conv(16,16), #2x2
conv(16,10, act=False), #1x1
nn.Flatten(),
)
simple_cnn(xb).shape
torch.Size([16, 10])
x_imgs = x_train.view(-1,1,28,28)
xv_imgs = x_valid.view(-1,1,28,28)
train_ds,valid_ds = Dataset(x_imgs, y_train),Dataset(xv_imgs, y_valid)
#|export
def_device = 'mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu'
def to_device(x, device=def_device):
if isinstance(x, torch.Tensor): return x.to(device)
if isinstance(x, Mapping): return {k:v.to(device) for k,v in x.items()}
return type(x)(to_device(o, device) for o in x)
def collate_device(b): return to_device(default_collate(b))
from torch import optim
bs = 256
lr = 0.4
train_dl,valid_dl = get_dls(train_ds, valid_ds, bs, collate_fn=collate_device)
opt = optim.SGD(simple_cnn.parameters(), lr=lr)
loss,acc = fit(5, simple_cnn.to(def_device), F.cross_entropy, opt, train_dl, valid_dl)
0 0.3630618950843811 0.8875999997138977
1 0.16439641580581665 0.9496000003814697
2 0.24622697901725768 0.9316000004768371
3 0.25093305287361145 0.9335999998092651
4 0.13128829071521758 0.9618000007629395
opt = optim.SGD(simple_cnn.parameters(), lr=lr/4)
loss,acc = fit(5, simple_cnn.to(def_device), F.cross_entropy, opt, train_dl, valid_dl)
0 0.08451943595409393 0.9756999996185303
1 0.08082638642787933 0.9777999995231629
2 0.08050601842403411 0.9778999995231629
3 0.08200360851287841 0.9773999995231628
4 0.08405050563812255 0.9761999994277955
In an input of size 64x1x28x28
the axes are batch,channel,height,width
. This is often represented as NCHW
(where N
refers to batch size). Tensorflow, on the other hand, uses NHWC
axis order (aka "channels-last"). Channels-last is faster for many models, so recently it's become more common to see this as an option in PyTorch too.
We have 1 input channel, 4 output channels, and a 3×3 kernel.
simple_cnn[0][0]
Conv2d(1, 4, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
conv1 = simple_cnn[0][0]
conv1.weight.shape
torch.Size([4, 1, 3, 3])
conv1.bias.shape
torch.Size([4])
The receptive field is the area of an image that is involved in the calculation of a layer. conv-example.xlsx shows the calculation of two stride-2 convolutional layers using an MNIST digit. Here's what we see if we click on one of the cells in the conv2 section, which shows the output of the second convolutional layer, and click trace precedents.
The blue highlighted cells are its precedents—that is, the cells used to calculate its value. These cells are the corresponding 3×3 area of cells from the input layer (on the left), and the cells from the filter (on the right). Click trace precedents again:
In this example, we have just two convolutional layers. We can see that a 7×7 area of cells in the input layer is used to calculate the single green cell in the Conv2 layer. This is the receptive field
The deeper we are in the network (specifically, the more stride-2 convs we have before a layer), the larger the receptive field for an activation in that layer.
A colour picture is a rank-3 tensor:
from torchvision.io import read_image
im = read_image('images/grizzly.jpg')
im.shape
torch.Size([3, 1000, 846])
show_image(im.permute(1,2,0));
_,axs = plt.subplots(1,3)
for bear,ax,color in zip(im,axs,('Reds','Greens','Blues')): show_image(255-bear, ax=ax, cmap=color)
These are then all added together, to produce a single number, for each grid location, for each output feature.
We have ch_out
filters like this, so in the end, the result of our convolutional layer will be a batch of images with ch_out
channels.
import nbdev; nbdev.nbdev_export()
import pickle, gzip, math, os, time, shutil, torch, matplotlib as mpl, numpy as np, matplotlib.pyplot as plt
import fastcore.all as fc
from collections.abc import Mapping
from pathlib import Path
from operator import attrgetter, itemgetter
from functools import partial
from torch import tensor, nn, optim
from torch.utils.data import DataLoader, default_collate
import torch.nn.functional as F
import torchvision.transforms.functional as TF
from datasets import load_dataset, load_dataset_builder
from fastprogress import progress_bar, master_bar
from miniai.datasets import *
from miniai.training import *
from miniai.conv import *
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'
import logging
logging.disable(logging.WARNING)
x, y = 'image', 'label'
name = "fashion_mnist"
dsd = load_dataset(name, ignore_verifications=True)
@inplace
def transformi(b): b[x] = [TF.to_tensor(o) for o in b[x]]
bs = 256
tds = dsd.with_transform(transformi)
ds = tds['train']
img = ds[0]['image']
show_image(img, figsize=(1,1))
cf = collate_dict(ds)
def collate_(b): return to_device(cf(b))
def data_loaders(dsd, bs, **kwargs): return {k:DataLoader(v, bs, **kwargs) for k,v in dsd.items()}
dls = data_loaders(tds, bs, collate_fn=collate_)
dt = dls['train']
dv = dls['test']
xb, yb = next(iter(dt))
labels = ds.features[y].names
lbl_getter = itemgetter(*yb[:16])
titles = lbl_getter(labels)
mpl.rcParams['figure.dpi'] = 70
show_images(xb[:16], imsize=1.7, titles=titles)
from torch import optim
bs = 256
lr = 0.4
cnn = nn.Sequential(
conv(1 ,4), #14x14
conv(4 ,8), #7x7
conv(8 ,16), #4x4
conv(16,16), #2x2
conv(16,10, act=False),
nn.Flatten()).to(def_device)
opt = optim.SGD(cnn.parameters(), lr=lr)
loss, acc = fit(5, cnn, F.cross_entropy, opt, dt, dv)
def deconv(ni, nf, ks=3, act=True):
layers = [nn.UpsamplingNearest2d(scale_factor=2),
nn.Conv2d(ni, nf, stride=1, kernel_size=ks, padding=ks//2)]
if act: layers.append(nn.ReLU())
return nn.Sequential(*layers)
def eval(model, loss_func, valid_dl, epoch=0):
model.eval()
with torch.no_grad():
tot_loss, count = 0., 0
for xb, _ in valid_dl:
pred = model(xb)
n = len(xb)
count += n
tot_loss += loss_func(pred, xb).item() * n
print(epoch, f'{tot_loss/count:.3f}')
def fit(epochs, model, loss_func, opt, train_dl, valid_dl):
for epoch in range(epochs):
model.train()
for xb, _ in train_dl:
loss = loss_func(model(xb), xb)
loss.backward()
opt.step()
opt.zero_grad()
eval(model, loss_func, valid_dl, epoch)
ae = nn.Sequential( #28x28
nn.ZeroPad2d(2), #32x32
conv(1,2), #16x16
conv(2,4), #8x8
deconv(4,2), #16x16
deconv(2,1, act=False), #32x32
nn.ZeroPad2d(-2), #28x28
nn.Sigmoid()
).to(def_device)
eval(ae, F.mse_loss, dv)
opt = optim.SGD(ae.parameters(), lr=0.01)
fit(5, ae, F.mse_loss, opt, dt, dv)
opt = optim.SGD(ae.parameters(), lr=0.1)
fit(5, ae, F.mse_loss, opt, dt, dv)
p = ae(xb)
show_images(p[:16].data.cpu(), imsize=1.5)
p = ae(xb)
show_images(p[:16].data.cpu(), imsize=1.5)
show_images(xb[:16].data.cpu(), imsize=1.5)
#| default_exp learner
import math,torch,matplotlib.pyplot as plt
import fastcore.all as fc
from collections.abc import Mapping
from operator import attrgetter
from functools import partial
from copy import copy
from torch import optim
import torch.nn.functional as F
from miniai.conv import *
from fastprogress import progress_bar,master_bar
import matplotlib as mpl
import torchvision.transforms.functional as TF
from contextlib import contextmanager
from torch import nn,tensor
from datasets import load_dataset,load_dataset_builder
from miniai.datasets import *
from miniai.conv import *
import logging
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'
logging.disable(logging.WARNING)
x,y = 'image','label'
name = "fashion_mnist"
dsd = load_dataset(name)
@inplace
def transformi(b): b[x] = [torch.flatten(TF.to_tensor(o)) for o in b[x]]
bs = 1024
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
dt = dls.train
xb,yb = next(iter(dt))
xb.shape,yb[:10]
#|export
class CancelFitException(Exception): pass
class CancelBatchException(Exception): pass
class CancelEpochException(Exception): pass
#|export
class Callback(): order = 0
#|export
def run_cbs(cbs, method_nm, learn=None):
for cb in sorted(cbs, key=attrgetter('order')):
method = getattr(cb, method_nm, None)
if method is not None: method(learn)
class CompletionCB(Callback):
def before_fit(self, learn): self.count = 0
def after_batch(self, learn): self.count += 1
def after_fit(self, learn): print(f'Completed {self.count} batches')
cbs = [CompletionCB()]
run_cbs(cbs, 'before_fit')
run_cbs(cbs, 'after_batch')
run_cbs(cbs, 'after_fit')
class Learner():
def __init__(self, model, dls, loss_func, lr, cbs, opt_func=optim.SGD): fc.store_attr()
def one_batch(self):
self.preds = self.model(self.batch[0])
self.loss = self.loss_func(self.preds, self.batch[1])
if self.model.training:
self.loss.backward()
self.opt.step()
self.opt.zero_grad()
def one_epoch(self, train):
self.model.train(train)
self.dl = self.dls.train if train else self.dls.valid
try:
self.callback('before_epoch')
for self.iter,self.batch in enumerate(self.dl):
try:
self.callback('before_batch')
self.one_batch()
self.callback('after_batch')
except CancelBatchException: pass
self.callback('after_epoch')
except CancelEpochException: pass
def fit(self, n_epochs):
self.n_epochs = n_epochs
self.epochs = range(n_epochs)
self.opt = self.opt_func(self.model.parameters(), self.lr)
try:
self.callback('before_fit')
for self.epoch in self.epochs:
self.one_epoch(True)
self.one_epoch(False)
self.callback('after_fit')
except CancelFitException: pass
def callback(self, method_nm): run_cbs(self.cbs, method_nm, self)
m,nh = 28*28,50
def get_model(): return nn.Sequential(nn.Linear(m,nh), nn.ReLU(), nn.Linear(nh,10))
model = get_model()
learn = Learner(model, dls, F.cross_entropy, lr=0.2, cbs=[CompletionCB()])
learn.fit(1)
class Metric:
def __init__(self): self.reset()
def reset(self): self.vals,self.ns = [],[]
def add(self, inp, targ=None, n=1):
self.last = self.calc(inp, targ)
self.vals.append(self.last)
self.ns.append(n)
@property
def value(self):
ns = tensor(self.ns)
return (tensor(self.vals)*ns).sum()/ns.sum()
def calc(self, inps, targs): return inps
class Accuracy(Metric):
def calc(self, inps, targs): return (inps==targs).float().mean()
acc = Accuracy()
acc.add(tensor([0, 1, 2, 0, 1, 2]), tensor([0, 1, 1, 2, 1, 0]))
acc.add(tensor([1, 1, 2, 0, 1]), tensor([0, 1, 1, 2, 1]))
acc.value
loss = Metric()
loss.add(0.6, n=32)
loss.add(0.9, n=2)
loss.value, round((0.6*32+0.9*2)/(32+2), 2)
#|export
from torcheval.metrics import MulticlassAccuracy,Mean
#|export
def to_cpu(x):
if isinstance(x, Mapping): return {k:to_cpu(v) for k,v in x.items()}
if isinstance(x, list): return [to_cpu(o) for o in x]
if isinstance(x, tuple): return tuple(to_cpu(list(x)))
res = x.detach().cpu()
return res.float() if res.dtype==torch.float16 else res
#|export
class MetricsCB(Callback):
def __init__(self, *ms, **metrics):
for o in ms: metrics[type(o).__name__] = o
self.metrics = metrics
self.all_metrics = copy(metrics)
self.all_metrics['loss'] = self.loss = Mean()
def _log(self, d): print(d)
def before_fit(self, learn): learn.metrics = self
def before_epoch(self, learn): [o.reset() for o in self.all_metrics.values()]
def after_epoch(self, learn):
log = {k:f'{v.compute():.3f}' for k,v in self.all_metrics.items()}
log['epoch'] = learn.epoch
log['train'] = 'train' if learn.model.training else 'eval'
self._log(log)
def after_batch(self, learn):
x,y,*_ = to_cpu(learn.batch)
for m in self.metrics.values(): m.update(to_cpu(learn.preds), y)
self.loss.update(to_cpu(learn.loss), weight=len(x))
#|export
class DeviceCB(Callback):
def __init__(self, device=def_device): fc.store_attr()
def before_fit(self, learn):
if hasattr(learn.model, 'to'): learn.model.to(self.device)
def before_batch(self, learn): learn.batch = to_device(learn.batch, device=self.device)
model = get_model()
metrics = MetricsCB(accuracy=MulticlassAccuracy())
learn = Learner(model, dls, F.cross_entropy, lr=0.2, cbs=[DeviceCB(), metrics])
learn.fit(1)
class Learner():
def __init__(self, model, dls=(0,), loss_func=F.mse_loss, lr=0.1, cbs=None, opt_func=optim.SGD):
cbs = fc.L(cbs)
fc.store_attr()
@contextmanager
def cb_ctx(self, nm):
try:
self.callback(f'before_{nm}')
yield
self.callback(f'after_{nm}')
except globals()[f'Cancel{nm.title()}Exception']: pass
finally: self.callback(f'cleanup_{nm}')
def one_epoch(self, train):
self.model.train(train)
self.dl = self.dls.train if train else self.dls.valid
with self.cb_ctx('epoch'):
for self.iter,self.batch in enumerate(self.dl):
with self.cb_ctx('batch'):
self.predict()
self.get_loss()
if self.training:
self.backward()
self.step()
self.zero_grad()
def fit(self, n_epochs=1, train=True, valid=True, cbs=None, lr=None):
cbs = fc.L(cbs)
# `add_cb` and `rm_cb` were added in lesson 18
for cb in cbs: self.cbs.append(cb)
try:
self.n_epochs = n_epochs
self.epochs = range(n_epochs)
self.opt = self.opt_func(self.model.parameters(), self.lr if lr is None else lr)
with self.cb_ctx('fit'):
for self.epoch in self.epochs:
if train: self.one_epoch(True)
if valid: torch.no_grad()(self.one_epoch)(False)
finally:
for cb in cbs: self.cbs.remove(cb)
def __getattr__(self, name):
if name in ('predict','get_loss','backward','step','zero_grad'): return partial(self.callback, name)
raise AttributeError(name)
def callback(self, method_nm): run_cbs(self.cbs, method_nm, self)
@property
def training(self): return self.model.training
#|export
class TrainCB(Callback):
def __init__(self, n_inp=1): self.n_inp = n_inp
def predict(self, learn): learn.preds = learn.model(*learn.batch[:self.n_inp])
def get_loss(self, learn): learn.loss = learn.loss_func(learn.preds, *learn.batch[self.n_inp:])
def backward(self, learn): learn.loss.backward()
def step(self, learn): learn.opt.step()
def zero_grad(self, learn): learn.opt.zero_grad()
#|export
class ProgressCB(Callback):
order = MetricsCB.order+1
def __init__(self, plot=False): self.plot = plot
def before_fit(self, learn):
learn.epochs = self.mbar = master_bar(learn.epochs)
self.first = True
if hasattr(learn, 'metrics'): learn.metrics._log = self._log
self.losses = []
self.val_losses = []
def _log(self, d):
if self.first:
self.mbar.write(list(d), table=True)
self.first = False
self.mbar.write(list(d.values()), table=True)
def before_epoch(self, learn): learn.dl = progress_bar(learn.dl, leave=False, parent=self.mbar)
def after_batch(self, learn):
learn.dl.comment = f'{learn.loss:.3f}'
if self.plot and hasattr(learn, 'metrics') and learn.training:
self.losses.append(learn.loss.item())
if self.val_losses: self.mbar.update_graph([[fc.L.range(self.losses), self.losses],[fc.L.range(learn.epoch).map(lambda x: (x+1)*len(learn.dls.train)), self.val_losses]])
def after_epoch(self, learn):
if not learn.training:
if self.plot and hasattr(learn, 'metrics'):
self.val_losses.append(learn.metrics.all_metrics['loss'].compute())
self.mbar.update_graph([[fc.L.range(self.losses), self.losses],[fc.L.range(learn.epoch+1).map(lambda x: (x+1)*len(learn.dls.train)), self.val_losses]])
model = get_model()
metrics = MetricsCB(accuracy=MulticlassAccuracy())
cbs = [TrainCB(), DeviceCB(), metrics, ProgressCB(plot=True)]
learn = Learner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(2)
After the lesson we noticed that contextlib.context_manager
has a surprising "feature" which doesn't let us raise an exception before the yield
. Therefore we've replaced the context manager with a decorator in this updated version of Learner
. We have also added a few more callbacks in one_epoch()
.
#|export
class with_cbs:
def __init__(self, nm): self.nm = nm
def __call__(self, f):
def _f(o, *args, **kwargs):
try:
o.callback(f'before_{self.nm}')
f(o, *args, **kwargs)
o.callback(f'after_{self.nm}')
except globals()[f'Cancel{self.nm.title()}Exception']: pass
finally: o.callback(f'cleanup_{self.nm}')
return _f
#|export
class Learner():
def __init__(self, model, dls=(0,), loss_func=F.mse_loss, lr=0.1, cbs=None, opt_func=optim.SGD):
cbs = fc.L(cbs)
fc.store_attr()
@with_cbs('batch')
def _one_batch(self):
self.predict()
self.callback('after_predict')
self.get_loss()
self.callback('after_loss')
if self.training:
self.backward()
self.callback('after_backward')
self.step()
self.callback('after_step')
self.zero_grad()
@with_cbs('epoch')
def _one_epoch(self):
for self.iter,self.batch in enumerate(self.dl): self._one_batch()
def one_epoch(self, training):
self.model.train(training)
self.dl = self.dls.train if training else self.dls.valid
self._one_epoch()
@with_cbs('fit')
def _fit(self, train, valid):
for self.epoch in self.epochs:
if train: self.one_epoch(True)
if valid: torch.no_grad()(self.one_epoch)(False)
def fit(self, n_epochs=1, train=True, valid=True, cbs=None, lr=None):
cbs = fc.L(cbs)
# `add_cb` and `rm_cb` were added in lesson 18
for cb in cbs: self.cbs.append(cb)
try:
self.n_epochs = n_epochs
self.epochs = range(n_epochs)
if lr is None: lr = self.lr
if self.opt_func: self.opt = self.opt_func(self.model.parameters(), lr)
self._fit(train, valid)
finally:
for cb in cbs: self.cbs.remove(cb)
def __getattr__(self, name):
if name in ('predict','get_loss','backward','step','zero_grad'): return partial(self.callback, name)
raise AttributeError(name)
def callback(self, method_nm): run_cbs(self.cbs, method_nm, self)
@property
def training(self): return self.model.training
model = get_model()
metrics = MetricsCB(accuracy=MulticlassAccuracy())
cbs = [TrainCB(), DeviceCB(), metrics, ProgressCB(plot=True)]
learn = Learner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(1)
#|export
class TrainLearner(Learner):
def predict(self): self.preds = self.model(self.batch[0])
def get_loss(self): self.loss = self.loss_func(self.preds, self.batch[1])
def backward(self): self.loss.backward()
def step(self): self.opt.step()
def zero_grad(self): self.opt.zero_grad()
#|export
class MomentumLearner(TrainLearner):
def __init__(self, model, dls, loss_func, lr=None, cbs=None, opt_func=optim.SGD, mom=0.85):
self.mom = mom
super().__init__(model, dls, loss_func, lr, cbs, opt_func)
def zero_grad(self):
with torch.no_grad():
for p in self.model.parameters(): p.grad *= self.mom
# NB: No TrainCB
metrics = MetricsCB(accuracy=MulticlassAccuracy())
cbs = [DeviceCB(), metrics, ProgressCB(plot=True)]
learn = MomentumLearner(get_model(), dls, F.cross_entropy, lr=0.1, cbs=cbs)
learn.fit(1)
class LRFinderCB(Callback):
def __init__(self, lr_mult=1.3): fc.store_attr()
def before_fit(self, learn):
self.lrs,self.losses = [],[]
self.min = math.inf
def after_batch(self, learn):
if not learn.training: raise CancelEpochException()
self.lrs.append(learn.opt.param_groups[0]['lr'])
loss = to_cpu(learn.loss)
self.losses.append(loss)
if loss < self.min: self.min = loss
if loss > self.min*3: raise CancelFitException()
for g in learn.opt.param_groups: g['lr'] *= self.lr_mult
lrfind = LRFinderCB()
cbs = [DeviceCB(), lrfind]
learn = MomentumLearner(get_model(), dls, F.cross_entropy, lr=1e-4, cbs=cbs)
learn.fit(1)
plt.plot(lrfind.lrs, lrfind.losses)
plt.xscale('log')
#|export
from torch.optim.lr_scheduler import ExponentialLR
#|export
class LRFinderCB(Callback):
def __init__(self, gamma=1.3, max_mult=3): fc.store_attr()
def before_fit(self, learn):
self.sched = ExponentialLR(learn.opt, self.gamma)
self.lrs,self.losses = [],[]
self.min = math.inf
def after_batch(self, learn):
if not learn.training: raise CancelEpochException()
self.lrs.append(learn.opt.param_groups[0]['lr'])
loss = to_cpu(learn.loss)
self.losses.append(loss)
if loss < self.min: self.min = loss
if math.isnan(loss) or (loss > self.min*self.max_mult):
raise CancelFitException()
self.sched.step()
def cleanup_fit(self, learn):
plt.plot(self.lrs, self.losses)
plt.xscale('log')
cbs = [DeviceCB()]
learn = MomentumLearner(get_model(), dls, F.cross_entropy, lr=1e-5, cbs=cbs)
learn.fit(3, cbs=LRFinderCB())
#|export
@fc.patch
def lr_find(self:Learner, gamma=1.3, max_mult=3, start_lr=1e-5, max_epochs=10):
self.fit(max_epochs, lr=start_lr, cbs=LRFinderCB(gamma=gamma, max_mult=max_mult))
MomentumLearner(get_model(), dls, F.cross_entropy, cbs=cbs).lr_find()
import nbdev; nbdev.nbdev_export()
#| default_exp activations
#|export
from __future__ import annotations
import random,math,torch,numpy as np,matplotlib.pyplot as plt
import fastcore.all as fc
from functools import partial
from miniai.datasets import *
from miniai.learner import *
import torch.nn.functional as F,matplotlib as mpl
from pathlib import Path
from operator import attrgetter,itemgetter
from contextlib import contextmanager
from torch import tensor,nn,optim
import torchvision.transforms.functional as TF
from datasets import load_dataset
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
mpl.rcParams['figure.constrained_layout.use'] = True
import logging
logging.disable(logging.WARNING)
#|export
def set_seed(seed, deterministic=False):
torch.use_deterministic_algorithms(deterministic)
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
x,y = 'image','label'
name = "fashion_mnist"
dsd = load_dataset(name)
bs = 1024
@inplace
def transformi(b): b[x] = [TF.to_tensor(o) for o in b[x]]
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
dt = dls.train
def conv(ni, nf, ks=3, act=True):
res = nn.Conv2d(ni, nf, stride=2, kernel_size=ks, padding=ks//2)
if act: res = nn.Sequential(res, nn.ReLU())
return res
def cnn_layers():
return [
conv(1 ,8, ks=5), #14x14
conv(8 ,16), #7x7
conv(16,32), #4x4
conv(32,64), #2x2
conv(64,10, act=False), #1x1
nn.Flatten()]
from torcheval.metrics import MulticlassAccuracy
metrics = MetricsCB(accuracy=MulticlassAccuracy())
cbs = [TrainCB(), DeviceCB(), metrics, ProgressCB(plot=True)]
def fit(model, epochs=1, xtra_cbs=None):
learn = Learner(model, dls, loss_func=F.cross_entropy, lr=0.6, cbs=cbs+fc.L(xtra_cbs))
learn.fit(epochs)
return learn
set_seed(1)
learn = fit(nn.Sequential(*cnn_layers()))
class SequentialModel(nn.Module):
def __init__(self, *layers):
super().__init__()
self.layers = nn.ModuleList(layers)
self.act_means = [[] for _ in layers]
self.act_stds = [[] for _ in layers]
def __call__(self, x):
for i,l in enumerate(self.layers):
x = l(x)
self.act_means[i].append(to_cpu(x).mean())
self.act_stds [i].append(to_cpu(x).std ())
return x
def __iter__(self): return iter(self.layers)
set_seed(1)
model = SequentialModel(*cnn_layers())
learn = fit(model)
for l in model.act_means: plt.plot(l)
plt.legend(range(5));
for l in model.act_stds: plt.plot(l)
plt.legend(range(5));
set_seed(1)
model = nn.Sequential(*cnn_layers())
act_means = [[] for _ in model]
act_stds = [[] for _ in model]
def append_stats(i, mod, inp, outp):
act_means[i].append(to_cpu(outp).mean())
act_stds [i].append(to_cpu(outp).std())
for i,m in enumerate(model): m.register_forward_hook(partial(append_stats, i))
fit(model)
for o in act_means: plt.plot(o)
plt.legend(range(5));
#| export
class Hook():
def __init__(self, m, f): self.hook = m.register_forward_hook(partial(f, self))
def remove(self): self.hook.remove()
def __del__(self): self.remove()
def append_stats(hook, mod, inp, outp):
if not hasattr(hook,'stats'): hook.stats = ([],[])
acts = to_cpu(outp)
hook.stats[0].append(acts.mean())
hook.stats[1].append(acts.std())
set_seed(1)
model = nn.Sequential(*cnn_layers())
hooks = [Hook(l, append_stats) for l in model[:5].children()]
learn = fit(model)
for h in hooks:
plt.plot(h.stats[0])
h.remove()
plt.legend(range(5));
#| export
class Hooks(list):
def __init__(self, ms, f): super().__init__([Hook(m, f) for m in ms])
def __enter__(self, *args): return self
def __exit__ (self, *args): self.remove()
def __del__(self): self.remove()
def __delitem__(self, i):
self[i].remove()
super().__delitem__(i)
def remove(self):
for h in self: h.remove()
set_seed(1)
model = nn.Sequential(*cnn_layers())
with Hooks(model, append_stats) as hooks:
fit(model)
fig,axs = plt.subplots(1,2, figsize=(10,4))
for h in hooks:
for i in 0,1: axs[i].plot(h.stats[i])
plt.legend(range(6));
#| export
class HooksCallback(Callback):
def __init__(self, hookfunc, mod_filter=fc.noop, on_train=True, on_valid=False, mods=None):
fc.store_attr()
super().__init__()
def before_fit(self, learn):
if self.mods: mods=self.mods
else: mods = fc.filter_ex(learn.model.modules(), self.mod_filter)
self.hooks = Hooks(mods, partial(self._hookfunc, learn))
def _hookfunc(self, learn, *args, **kwargs):
if (self.on_train and learn.training) or (self.on_valid and not learn.training): self.hookfunc(*args, **kwargs)
def after_fit(self, learn): self.hooks.remove()
def __iter__(self): return iter(self.hooks)
def __len__(self): return len(self.hooks)
hc = HooksCallback(append_stats, mod_filter=fc.risinstance(nn.Conv2d))
set_seed(1)
model = nn.Sequential(*cnn_layers())
fit(model, xtra_cbs=[hc]);
fig,axs = plt.subplots(1,2, figsize=(10,4))
for h in hc:
for i in 0,1: axs[i].plot(h.stats[i])
plt.legend(range(6));
#| export
def append_stats(hook, mod, inp, outp):
if not hasattr(hook,'stats'): hook.stats = ([],[],[])
acts = to_cpu(outp)
hook.stats[0].append(acts.mean())
hook.stats[1].append(acts.std())
hook.stats[2].append(acts.abs().histc(40,0,10))
set_seed(1)
model = nn.Sequential(*cnn_layers())
hc = HooksCallback(append_stats, mod_filter=fc.risinstance(nn.Conv2d))
fit(model, xtra_cbs=[hc]);
#| export
# Thanks to @ste for initial version of histgram plotting code
def get_hist(h): return torch.stack(h.stats[2]).t().float().log1p()
fig,axes = get_grid(len(hc), figsize=(11,5))
for ax,h in zip(axes.flat, hc):
show_image(get_hist(h), ax, origin='lower')
#| export
def get_min(h):
h1 = torch.stack(h.stats[2]).t().float()
return h1[0]/h1.sum(0)
fig,axes = get_grid(len(hc), figsize=(11,5))
for ax,h in zip(axes.flatten(), hc):
ax.plot(get_min(h))
ax.set_ylim(0,1)
#|export
class ActivationStats(HooksCallback):
def __init__(self, mod_filter=fc.noop): super().__init__(append_stats, mod_filter)
def color_dim(self, figsize=(11,5)):
fig,axes = get_grid(len(self), figsize=figsize)
for ax,h in zip(axes.flat, self):
show_image(get_hist(h), ax, origin='lower')
def dead_chart(self, figsize=(11,5)):
fig,axes = get_grid(len(self), figsize=figsize)
for ax,h in zip(axes.flatten(), self):
ax.plot(get_min(h))
ax.set_ylim(0,1)
def plot_stats(self, figsize=(10,4)):
fig,axs = plt.subplots(1,2, figsize=figsize)
for h in self:
for i in 0,1: axs[i].plot(h.stats[i])
axs[0].set_title('Means')
axs[1].set_title('Stdevs')
plt.legend(fc.L.range(self))
astats = ActivationStats(fc.risinstance(nn.Conv2d))
set_seed(1)
model = nn.Sequential(*cnn_layers())
fit(model, xtra_cbs=[astats]);
astats.color_dim()
astats.dead_chart()
astats.plot_stats()
import nbdev; nbdev.nbdev_export()
#|default_exp init
#|export
import pickle,gzip,math,os,time,shutil,torch,matplotlib as mpl,numpy as np,matplotlib.pyplot as plt
import sys,gc,traceback
import fastcore.all as fc
from collections.abc import Mapping
from pathlib import Path
from operator import attrgetter,itemgetter
from functools import partial
from copy import copy
from contextlib import contextmanager
import torchvision.transforms.functional as TF,torch.nn.functional as F
from torch import tensor,nn,optim
from torch.utils.data import DataLoader,default_collate
from torch.nn import init
from torcheval.metrics import MulticlassAccuracy
from datasets import load_dataset,load_dataset_builder
from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
import logging
logging.disable(logging.WARNING)
set_seed(42)
xl,yl = 'image','label'
name = "fashion_mnist"
dsd = load_dataset(name)
@inplace
def transformi(b): b[xl] = [TF.to_tensor(o) for o in b[xl]]
bs = 1024
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
dt = dls.train
xb,yb = next(iter(dt))
def get_model():
return nn.Sequential(conv(1 ,8), conv(8 ,16), conv(16,32), conv(32,64),
conv(64,10, act=False), nn.Flatten()).to(def_device)
metrics = MetricsCB(accuracy=MulticlassAccuracy())
astats = ActivationStats(fc.risinstance(nn.ReLU))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
learn = MomentumLearner(get_model(), dls, F.cross_entropy, lr=0.2, cbs=cbs)
x = torch.randn(200, 100)
for i in range(50): x = x @ torch.randn(100,100)
x[0:5,0:5]
x = torch.randn(200, 100)
for i in range(50): x = x @ (torch.randn(100,100) * 0.01)
x[0:5,0:5]
x = torch.randn(200, 100)
for i in range(50): x = x @ (torch.randn(100,100) * 0.1)
x[0:5,0:5]
x = torch.randn(100)
x.mean(), x.std()
mean,sqr = 0.,0.
for i in range(100):
x = torch.randn(100)
a = torch.randn(512, 100)
y = a @ x
mean += y.mean().item()
sqr += y.pow(2).mean().item()
mean/100,sqr/100
mean,sqr = 0.,0.
for i in range(10000):
x = torch.randn(1)
a = torch.randn(1)
y = a*x
mean += y.item()
sqr += y.pow(2).item()
mean/10000,sqr/10000
x = torch.randn(200, 100)
y = torch.randn(200)
from math import sqrt
w1 = torch.randn(100,50) / sqrt(100)
b1 = torch.zeros(50)
w2 = torch.randn(50,1) / sqrt(50)
b2 = torch.zeros(1)
def lin(x, w, b): return x @ w + b
l1 = lin(x, w1, b1)
l1.mean(),l1.std()
def relu(x): return x.clamp_min(0.)
l2 = relu(l1)
l2.mean(),l2.std()
x = torch.randn(200, 100)
for i in range(50): x = relu(x @ (torch.randn(100,100) * 0.1))
x[0:5,0:5]
x = torch.randn(200, 100)
for i in range(50): x = relu(x @ (torch.randn(100,100) * sqrt(2/100)))
x[0:5,0:5]
model = get_model()
model.apply(lambda m: print(type(m).__name__));
def init_weights(m):
if isinstance(m, (nn.Conv1d,nn.Conv2d,nn.Conv3d)): init.kaiming_normal_(m.weight)
model.apply(init_weights);
MomentumLearner(model, dls, F.cross_entropy, cbs=[DeviceCB()]).lr_find()
set_seed(42)
learn = MomentumLearner(get_model().apply(init_weights), dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
astats.color_dim()
astats.plot_stats()
xmean,xstd = xb.mean(),xb.std()
xmean,xstd
#| export
class BatchTransformCB(Callback):
def __init__(self, tfm, on_train=True, on_val=True): fc.store_attr()
def before_batch(self, learn):
if (self.on_train and learn.training) or (self.on_val and not learn.training):
learn.batch = self.tfm(learn.batch)
def _norm(b): return (b[0]-xmean)/xstd,b[1]
norm = BatchTransformCB(_norm)
set_seed(42)
learn = MomentumLearner(get_model().apply(init_weights), dls, F.cross_entropy, lr=0.2, cbs=cbs+[norm])
learn.fit(3)
astats.color_dim()
astats.plot_stats()
@inplace
def transformi(b): b[xl] = [(TF.to_tensor(o)-xmean)/xstd for o in b[xl]]
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
xb,yb = next(iter(dls.train))
xb.mean(),xb.std()
#| export
class GeneralRelu(nn.Module):
def __init__(self, leak=None, sub=None, maxv=None):
super().__init__()
self.leak,self.sub,self.maxv = leak,sub,maxv
def forward(self, x):
x = F.leaky_relu(x,self.leak) if self.leak is not None else F.relu(x)
if self.sub is not None: x -= self.sub
if self.maxv is not None: x.clamp_max_(self.maxv)
return x
#| export
def plot_func(f, start=-5., end=5., steps=100):
x = torch.linspace(start, end, steps)
plt.plot(x, f(x))
plt.grid(True, which='both', ls='--')
plt.axhline(y=0, color='k', linewidth=0.7)
plt.axvline(x=0, color='k', linewidth=0.7)
plot_func(GeneralRelu(leak=0.1, sub=0.4))
def conv(ni, nf, ks=3, stride=2, act=nn.ReLU):
res = nn.Conv2d(ni, nf, stride=stride, kernel_size=ks, padding=ks//2)
if act: res = nn.Sequential(res, act())
return res
def get_model(act=nn.ReLU, nfs=None):
if nfs is None: nfs = [1,8,16,32,64]
layers = [conv(nfs[i], nfs[i+1], act=act) for i in range(len(nfs)-1)]
return nn.Sequential(*layers, conv(nfs[-1],10, act=None), nn.Flatten()).to(def_device)
#| export
def init_weights(m, leaky=0.):
if isinstance(m, (nn.Conv1d,nn.Conv2d,nn.Conv3d)): init.kaiming_normal_(m.weight, a=leaky)
act_gr = partial(GeneralRelu, leak=0.1, sub=0.4)
astats = ActivationStats(fc.risinstance(GeneralRelu))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
iw = partial(init_weights, leaky=0.1)
model = get_model(act_gr).apply(iw)
set_seed(42)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
astats.color_dim()
astats.plot_stats()
astats.dead_chart()
#| export
def _lsuv_stats(hook, mod, inp, outp):
acts = to_cpu(outp)
hook.mean = acts.mean()
hook.std = acts.std()
def lsuv_init(model, m, m_in, xb):
h = Hook(m, _lsuv_stats)
with torch.no_grad():
while model(xb) is not None and (abs(h.std-1)>1e-3 or abs(h.mean)>1e-3):
m_in.bias -= h.mean
m_in.weight.data /= h.std
h.remove()
model = get_model(act_gr)
relus = [o for o in model.modules() if isinstance(o, GeneralRelu)]
convs = [o for o in model.modules() if isinstance(o, nn.Conv2d)]
for ms in zip(relus,convs): print(ms)
for ms in zip(relus,convs): lsuv_init(model, *ms, xb.to(def_device))
set_seed(42)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
astats.plot_stats()
class LayerNorm(nn.Module):
def __init__(self, dummy, eps=1e-5):
super().__init__()
self.eps = eps
self.mult = nn.Parameter(tensor(1.))
self.add = nn.Parameter(tensor(0.))
def forward(self, x):
m = x.mean((1,2,3), keepdim=True)
v = x.var ((1,2,3), keepdim=True)
x = (x-m) / ((v+self.eps).sqrt())
return x*self.mult + self.add
#|export
def conv(ni, nf, ks=3, stride=2, act=nn.ReLU, norm=None, bias=None):
if bias is None: bias = not isinstance(norm, (nn.BatchNorm1d,nn.BatchNorm2d,nn.BatchNorm3d))
layers = [nn.Conv2d(ni, nf, stride=stride, kernel_size=ks, padding=ks//2, bias=bias)]
if norm: layers.append(norm(nf))
if act: layers.append(act())
return nn.Sequential(*layers)
#|export
def get_model(act=nn.ReLU, nfs=None, norm=None):
if nfs is None: nfs = [1,8,16,32,64]
layers = [conv(nfs[i], nfs[i+1], act=act, norm=norm) for i in range(len(nfs)-1)]
return nn.Sequential(*layers, conv(nfs[-1],10, act=None, norm=False, bias=True),
nn.Flatten()).to(def_device)
set_seed(42)
model = get_model(act_gr, norm=LayerNorm).apply(iw)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
class BatchNorm(nn.Module):
def __init__(self, nf, mom=0.1, eps=1e-5):
super().__init__()
# NB: pytorch bn mom is opposite of what you'd expect
self.mom,self.eps = mom,eps
self.mults = nn.Parameter(torch.ones (nf,1,1))
self.adds = nn.Parameter(torch.zeros(nf,1,1))
self.register_buffer('vars', torch.ones(1,nf,1,1))
self.register_buffer('means', torch.zeros(1,nf,1,1))
def update_stats(self, x):
m = x.mean((0,2,3), keepdim=True)
v = x.var ((0,2,3), keepdim=True)
self.means.lerp_(m, self.mom)
self.vars.lerp_ (v, self.mom)
return m,v
def forward(self, x):
if self.training:
with torch.no_grad(): m,v = self.update_stats(x)
else: m,v = self.means,self.vars
x = (x-m) / (v+self.eps).sqrt()
return x*self.mults + self.adds
model = get_model(act_gr, norm=BatchNorm).apply(iw)
set_seed(42)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.4, cbs=cbs)
learn.fit(3)
dls = DataLoaders.from_dd(tds, 256, num_workers=4)
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.05, cbs=cbs)
learn.fit(2)
import nbdev; nbdev.nbdev_export()
#|export
import torch
from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from miniai.init import *
import pickle,gzip,math,os,time,shutil,torch,matplotlib as mpl,numpy as np,matplotlib.pyplot as plt
import fastcore.all as fc
from collections.abc import Mapping
from pathlib import Path
from operator import attrgetter,itemgetter
from functools import partial
from copy import copy
from contextlib import contextmanager
import torchvision.transforms.functional as TF,torch.nn.functional as F
from torch import tensor,nn,optim
from torch.utils.data import DataLoader,default_collate
from torch.nn import init
from torch.optim import lr_scheduler
from torcheval.metrics import MulticlassAccuracy
from datasets import load_dataset,load_dataset_builder
from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from miniai.init import *
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
import logging
logging.disable(logging.WARNING)
set_seed(42)
xl,yl = 'image','label'
name = "fashion_mnist"
dsd = load_dataset(name)
bs = 1024
xmean,xstd = 0.28, 0.35
@inplace
def transformi(b): b[xl] = [(TF.to_tensor(o)-xmean)/xstd for o in b[xl]]
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
metrics = MetricsCB(accuracy=MulticlassAccuracy())
astats = ActivationStats(fc.risinstance(GeneralRelu))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
act_gr = partial(GeneralRelu, leak=0.1, sub=0.4)
iw = partial(init_weights, leaky=0.1)
lrf_cbs = [DeviceCB(), LRFinderCB()]
class SGD:
def __init__(self, params, lr, wd=0.):
params = list(params)
fc.store_attr()
self.i = 0
def step(self):
with torch.no_grad():
for p in self.params:
self.reg_step(p)
self.opt_step(p)
self.i +=1
def opt_step(self, p): p -= p.grad * self.lr
def reg_step(self, p):
if self.wd != 0: p *= 1 - self.lr*self.wd
def zero_grad(self):
for p in self.params: p.grad.data.zero_()
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=0.4, cbs=cbs, opt_func=SGD)
learn.fit(3)
xs = torch.linspace(-4, 4, 100)
ys = 1 - (xs/3) ** 2 + torch.randn(100) * 0.1
_,axs = plt.subplots(2,2, figsize=(12,8))
betas = [0.5,0.7,0.9,0.99]
for beta,ax in zip(betas, axs.flatten()):
ax.scatter(xs,ys)
avg,res = 0,[]
for yi in ys:
avg = beta*avg + (1-beta)*yi
res.append(avg)
ax.plot(xs,np.array(res), color='red');
ax.set_title(f'beta={beta}')
class Momentum(SGD):
def __init__(self, params, lr, wd=0., mom=0.9):
super().__init__(params, lr=lr, wd=wd)
self.mom=mom
def opt_step(self, p):
if not hasattr(p, 'grad_avg'): p.grad_avg = torch.zeros_like(p.grad)
p.grad_avg = p.grad_avg*self.mom + p.grad*(1-self.mom)
p -= self.lr * p.grad_avg
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=1.5, cbs=cbs, opt_func=Momentum)
learn.fit(3)
astats.color_dim()
class RMSProp(SGD):
def __init__(self, params, lr, wd=0., sqr_mom=0.99, eps=1e-5):
super().__init__(params, lr=lr, wd=wd)
self.sqr_mom,self.eps = sqr_mom,eps
def opt_step(self, p):
if not hasattr(p, 'sqr_avg'): p.sqr_avg = p.grad**2
p.sqr_avg = p.sqr_avg*self.sqr_mom + p.grad**2*(1-self.sqr_mom)
p -= self.lr * p.grad/(p.sqr_avg.sqrt() + self.eps)
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=3e-3, cbs=cbs, opt_func=RMSProp)
learn.fit(3)
astats.color_dim()
class Adam(SGD):
def __init__(self, params, lr, wd=0., beta1=0.9, beta2=0.99, eps=1e-5):
super().__init__(params, lr=lr, wd=wd)
self.beta1,self.beta2,self.eps = beta1,beta2,eps
def opt_step(self, p):
if not hasattr(p, 'avg'): p.avg = torch.zeros_like(p.grad.data)
if not hasattr(p, 'sqr_avg'): p.sqr_avg = torch.zeros_like(p.grad.data)
p.avg = self.beta1*p.avg + (1-self.beta1)*p.grad
unbias_avg = p.avg / (1 - (self.beta1**(self.i+1)))
p.sqr_avg = self.beta2*p.sqr_avg + (1-self.beta2)*(p.grad**2)
unbias_sqr_avg = p.sqr_avg / (1 - (self.beta2**(self.i+1)))
p -= self.lr * unbias_avg / (unbias_sqr_avg + self.eps).sqrt()
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=6e-3, cbs=cbs, opt_func=Adam)
learn.fit(3)
#|export
class BaseSchedCB(Callback):
def __init__(self, sched): self.sched = sched
def before_fit(self, learn): self.schedo = self.sched(learn.opt)
def _step(self, learn):
if learn.training: self.schedo.step()
#|export
class BatchSchedCB(BaseSchedCB):
def after_batch(self, learn): self._step(learn)
#|export
class HasLearnCB(Callback):
def before_fit(self, learn): self.learn = learn
def after_fit(self, learn): self.learn = None
#|export
class RecorderCB(Callback):
def __init__(self, **d): self.d = d
def before_fit(self, learn):
self.recs = {k:[] for k in self.d}
self.pg = learn.opt.param_groups[0]
def after_batch(self, learn):
if not learn.training: return
for k,v in self.d.items():
self.recs[k].append(v(self))
def plot(self):
for k,v in self.recs.items():
plt.plot(v, label=k)
plt.legend()
plt.show()
def _lr(cb): return cb.pg['lr']
tmax = 3 * len(dls.train)
sched = partial(lr_scheduler.CosineAnnealingLR, T_max=tmax)
set_seed(42)
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
rec = RecorderCB(lr=_lr)
xtra = [BatchSchedCB(sched),rec]
learn = TrainLearner(model, dls, F.cross_entropy, lr=2e-2, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(3)
rec.plot()
#|export
class EpochSchedCB(BaseSchedCB):
def after_epoch(self, learn): self._step(learn)
sched = partial(lr_scheduler.CosineAnnealingLR, T_max=3)
set_seed(42)
xtra = [EpochSchedCB(sched),rec]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=2e-2, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(3)
rec.plot()
Paper by Leslie Smith.
def _beta1(cb): return cb.pg['betas'][0]
rec = RecorderCB(lr=_lr, mom=_beta1)
set_seed(42)
lr,epochs = 6e-2,5
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched), rec]
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
rec.plot()
import nbdev; nbdev.nbdev_export()
#|export
import pickle,gzip,math,os,time,shutil,torch,matplotlib as mpl,numpy as np,matplotlib.pyplot as plt
import fastcore.all as fc
from collections.abc import Mapping
from pathlib import Path
from operator import attrgetter,itemgetter
from functools import partial
from copy import copy
from contextlib import contextmanager
import torchvision.transforms.functional as TF,torch.nn.functional as F
from torch import tensor,nn,optim
from torch.utils.data import DataLoader,default_collate
from torch.nn import init
from torch.optim import lr_scheduler
from torcheval.metrics import MulticlassAccuracy
from datasets import load_dataset,load_dataset_builder
from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from miniai.init import *
from miniai.sgd import *
from fastcore.test import test_close
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray'
import logging
logging.disable(logging.WARNING)
set_seed(42)
xl,yl = 'image','label'
name = "fashion_mnist"
bs = 1024
xmean,xstd = 0.28, 0.35
@inplace
def transformi(b): b[xl] = [(TF.to_tensor(o)-xmean)/xstd for o in b[xl]]
dsd = load_dataset(name)
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=4)
#|export
act_gr = partial(GeneralRelu, leak=0.1, sub=0.4)
metrics = MetricsCB(accuracy=MulticlassAccuracy())
astats = ActivationStats(fc.risinstance(GeneralRelu))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
iw = partial(init_weights, leaky=0.1)
def get_model(act=nn.ReLU, nfs=(8,16,32,64,128), norm=nn.BatchNorm2d):
layers = [conv(1, 8, stride=1, act=act, norm=norm)]
layers += [conv(nfs[i], nfs[i+1], act=act, norm=norm) for i in range(len(nfs)-1)]
return nn.Sequential(*layers, conv(nfs[-1], 10, act=None, norm=norm, bias=True), nn.Flatten()).to(def_device)
set_seed(42)
lr,epochs = 6e-2,5
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched)]
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
The ResNet (residual network) was introduced in 2015 by Kaiming He et al in the article "Deep Residual Learning for Image Recognition". The key idea is using a skip connection to allow deeper networks to train successfully.
#|export
def _conv_block(ni, nf, stride, act=act_gr, norm=None, ks=3):
return nn.Sequential(conv(ni, nf, stride=1, act=act, norm=norm, ks=ks),
conv(nf, nf, stride=stride, act=None, norm=norm, ks=ks))
class ResBlock(nn.Module):
def __init__(self, ni, nf, stride=1, ks=3, act=act_gr, norm=None):
super().__init__()
self.convs = _conv_block(ni, nf, stride, act=act, ks=ks, norm=norm)
self.idconv = fc.noop if ni==nf else conv(ni, nf, ks=1, stride=1, act=None)
self.pool = fc.noop if stride==1 else nn.AvgPool2d(2, ceil_mode=True)
self.act = act()
def forward(self, x): return self.act(self.convs(x) + self.idconv(self.pool(x)))
def get_model(act=nn.ReLU, nfs=(8,16,32,64,128,256), norm=nn.BatchNorm2d):
layers = [ResBlock(1, 8, stride=1, act=act, norm=norm)]
layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
layers += [nn.Flatten(), nn.Linear(nfs[-1], 10, bias=False), nn.BatchNorm1d(10)]
return nn.Sequential(*layers).to(def_device)
def _print_shape(hook, mod, inp, outp): print(type(mod).__name__, inp[0].shape, outp.shape)
model = get_model()
learn = TrainLearner(model, dls, F.cross_entropy, cbs=[DeviceCB(), SingleBatchCB()])
with Hooks(model, _print_shape) as hooks: learn.fit(1, train=False)
@fc.patch
def summary(self:Learner):
res = '|Module|Input|Output|Num params|\n|--|--|--|--|\n'
tot = 0
def _f(hook, mod, inp, outp):
nonlocal res,tot
nparms = sum(o.numel() for o in mod.parameters())
tot += nparms
res += f'|{type(mod).__name__}|{tuple(inp[0].shape)}|{tuple(outp.shape)}|{nparms}|\n'
with Hooks(self.model, _f) as hooks: self.fit(1, lr=1, train=False, cbs=SingleBatchCB())
print("Tot params: ", tot)
if fc.IN_NOTEBOOK:
from IPython.display import Markdown
return Markdown(res)
else: print(res)
TrainLearner(get_model(), dls, F.cross_entropy, cbs=DeviceCB()).summary()
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
MomentumLearner(model, dls, F.cross_entropy, cbs=DeviceCB()).lr_find()
lr = 2e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched)]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
import timm
from timm.models.resnet import BasicBlock, ResNet, Bottleneck
' '.join(timm.list_models('*resnet*'))
model = timm.create_model('resnet18d', in_chans=1, num_classes=10)
# model = ResNet(in_chans=1, block=BasicBlock, layers=[2,2,2,2], stem_width=32, avg_down=True)
lr = 2e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched)]
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
import nbdev; nbdev.nbdev_export()
#|default_exp augment
#|export
import torch,random
import fastcore.all as fc
from torch import nn
from torch.nn import init
from miniai.datasets import *
from miniai.conv import *
from miniai.learner import *
from miniai.activations import *
from miniai.init import *
from miniai.sgd import *
from miniai.resnet import *
import pickle,gzip,math,os,time,shutil
import matplotlib as mpl,numpy as np,matplotlib.pyplot as plt
from collections.abc import Mapping
from pathlib import Path
from operator import attrgetter,itemgetter
from functools import partial
from copy import copy
from contextlib import contextmanager
import torchvision.transforms.functional as TF,torch.nn.functional as F
from torch import tensor,optim
from torch.utils.data import DataLoader,default_collate
from torch.optim import lr_scheduler
from torcheval.metrics import MulticlassAccuracy
from datasets import load_dataset,load_dataset_builder
from fastcore.test import test_close
from torch import distributions
torch.set_printoptions(precision=2, linewidth=140, sci_mode=False)
torch.manual_seed(1)
mpl.rcParams['image.cmap'] = 'gray_r'
import logging
logging.disable(logging.WARNING)
set_seed(42)
if fc.defaults.cpus>8: fc.defaults.cpus=8
xl,yl = 'image','label'
name = "fashion_mnist"
bs = 1024
xmean,xstd = 0.28, 0.35
@inplace
def transformi(b): b[xl] = [(TF.to_tensor(o)-xmean)/xstd for o in b[xl]]
dsd = load_dataset(name)
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=fc.defaults.cpus)
metrics = MetricsCB(accuracy=MulticlassAccuracy())
astats = ActivationStats(fc.risinstance(GeneralRelu))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
act_gr = partial(GeneralRelu, leak=0.1, sub=0.4)
iw = partial(init_weights, leaky=0.1)
set_seed(42)
lr,epochs = 6e-2,5
def get_model(act=nn.ReLU, nfs=(16,32,64,128,256,512), norm=nn.BatchNorm2d):
layers = [ResBlock(1, 16, ks=5, stride=1, act=act, norm=norm)]
layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
layers += [nn.Flatten(), nn.Linear(nfs[-1], 10, bias=False), nn.BatchNorm1d(10)]
return nn.Sequential(*layers)
lr = 1e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched)]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
[Output table and plot omitted for brevity]
class GlobalAvgPool(nn.Module):
def forward(self, x): return x.mean((-2,-1))
def get_model2(act=nn.ReLU, nfs=(16,32,64,128,256), norm=nn.BatchNorm2d):
layers = [ResBlock(1, 16, ks=5, stride=1, act=act, norm=norm)]
layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
layers += [ResBlock(256, 512, act=act, norm=norm), GlobalAvgPool()]
layers += [nn.Linear(512, 10, bias=False), nn.BatchNorm1d(10)]
return nn.Sequential(*layers)
#|export
def _flops(x, h, w):
if x.dim()<3: return x.numel()
if x.dim()==4: return x.numel()*h*w
@fc.patch
def summary(self:Learner):
res = '|Module|Input|Output|Num params|MFLOPS|\n|--|--|--|--|--|\n'
totp,totf = 0,0
def _f(hook, mod, inp, outp):
nonlocal res,totp,totf
nparms = sum(o.numel() for o in mod.parameters())
totp += nparms
*_,h,w = outp.shape
flops = sum(_flops(o, h, w) for o in mod.parameters())/1e6
totf += flops
res += f'|{type(mod).__name__}|{tuple(inp[0].shape)}|{tuple(outp.shape)}|{nparms}|{flops:.1f}|\n'
with Hooks(self.model, _f) as hooks: self.fit(1, lr=1, cbs=SingleBatchCB())
print(f"Tot params: {totp}; MFLOPS: {totf:.1f}")
if fc.IN_NOTEBOOK:
from IPython.display import Markdown
return Markdown(res)
else: print(res)
TrainLearner(get_model2(), dls, F.cross_entropy, lr=lr, cbs=[DeviceCB()]).summary()
[Output table omitted for brevity]
set_seed(42)
model = get_model2(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
[Output table and plot omitted for brevity]
mdl_path = Path('models')
mdl_path.mkdir(exist_ok=True)
torch.save(learn.model, mdl_path/'data_aug.pkl')
#| export
class CapturePreds(Callback):
def before_fit(self, learn): self.all_inps,self.all_preds,self.all_targs = [],[],[]
def after_batch(self, learn):
self.all_inps. append(to_cpu(learn.batch[0]))
self.all_preds.append(to_cpu(learn.preds))
self.all_targs.append(to_cpu(learn.batch[1]))
def after_fit(self, learn):
self.all_preds,self.all_targs,self.all_inps = map(torch.cat, [self.all_preds,self.all_targs,self.all_inps])
#| export
@fc.patch
def capture_preds(self: Learner, cbs=None, inps=False):
cp = CapturePreds()
self.fit(1, train=False, cbs=[cp]+fc.L(cbs))
res = cp.all_preds,cp.all_targs
if inps: res = res+(cp.all_inps,)
return res
ap1, at = learn.capture_preds()
ttacb = BatchTransformCB(partial(tfm_batch, tfm_x=TF.hflip), on_val=True)
ap2, at = learn.capture_preds(cbs=[ttacb])
ap1.shape,ap2.shape,at.shape
ap = torch.stack([ap1,ap2]).mean(0).argmax(1)
round((ap==at).float().mean().item(), 3)
xb,_ = next(iter(dls.train))
xbt = xb[:16]
xm,xs = xbt.mean(),xbt.std()
xbt.min(), xbt.max()
pct = 0.2
szx = int(pct*xbt.shape[-2])
szy = int(pct*xbt.shape[-1])
stx = int(random.random()*(1-pct)*xbt.shape[-2])
sty = int(random.random()*(1-pct)*xbt.shape[-1])
stx,sty,szx,szy
init.normal_(xbt[:,:,stx:stx+szx,sty:sty+szy], mean=xm, std=xs);
show_images(xbt, imsize=1.5)
xbt.min(), xbt.max()
#|export
def _rand_erase1(x, pct, xm, xs, mn, mx):
szx = int(pct*x.shape[-2])
szy = int(pct*x.shape[-1])
stx = int(random.random()*(1-pct)*x.shape[-2])
sty = int(random.random()*(1-pct)*x.shape[-1])
init.normal_(x[:,:,stx:stx+szx,sty:sty+szy], mean=xm, std=xs)
x.clamp_(mn, mx)
xb,_ = next(iter(dls.train))
xbt = xb[:16]
_rand_erase1(xbt, 0.2, xbt.mean(), xbt.std(), xbt.min(), xbt.max())
show_images(xbt, imsize=1.5)
xbt.mean(),xbt.std(),xbt.min(), xbt.max()
#|export
def rand_erase(x, pct=0.2, max_num = 4):
xm,xs,mn,mx = x.mean(),x.std(),x.min(),x.max()
num = random.randint(0, max_num)
for i in range(num): _rand_erase1(x, pct, xm, xs, mn, mx)
return x
xb,_ = next(iter(dls.train))
xbt = xb[:16]
rand_erase(xbt, 0.2, 4)
show_images(xbt, imsize=1.5)
#|export
class RandErase(nn.Module):
def __init__(self, pct=0.2, max_num=4):
super().__init__()
self.pct,self.max_num = pct,max_num
def forward(self, x): return rand_erase(x, self.pct, self.max_num)
tfms = nn.Sequential(transforms.RandomCrop(28, padding=1),
transforms.RandomHorizontalFlip(),
RandErase())
augcb = BatchTransformCB(partial(tfm_batch, tfm_x=tfms), on_val=False)
model = get_model()
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=[DeviceCB(), SingleBatchCB(), augcb])
learn.fit(1)
xb,yb = learn.batch
show_images(xb[:16], imsize=1.5)
epochs = 50
lr = 2e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched), augcb]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
xb,_ = next(iter(dls.train))
xbt = xb[:16]
szx = int(pct*xbt.shape[-2])
szy = int(pct*xbt.shape[-1])
stx1 = int(random.random()*(1-pct)*xbt.shape[-2])
sty1 = int(random.random()*(1-pct)*xbt.shape[-1])
stx2 = int(random.random()*(1-pct)*xbt.shape[-2])
sty2 = int(random.random()*(1-pct)*xbt.shape[-1])
stx1,sty1,stx2,sty2,szx,szy
xbt[:,:,stx1:stx1+szx,sty1:sty1+szy] = xbt[:,:,stx2:stx2+szx,sty2:sty2+szy]
show_images(xbt, imsize=1.5)
#|export
def _rand_copy1(x, pct):
szx = int(pct*x.shape[-2])
szy = int(pct*x.shape[-1])
stx1 = int(random.random()*(1-pct)*x.shape[-2])
sty1 = int(random.random()*(1-pct)*x.shape[-1])
stx2 = int(random.random()*(1-pct)*x.shape[-2])
sty2 = int(random.random()*(1-pct)*x.shape[-1])
x[:,:,stx1:stx1+szx,sty1:sty1+szy] = x[:,:,stx2:stx2+szx,sty2:sty2+szy]
xb,_ = next(iter(dls.train))
xbt = xb[:16]
_rand_copy1(xbt, 0.2)
show_images(xbt, imsize=1.5)
#|export
def rand_copy(x, pct=0.2, max_num = 4):
num = random.randint(0, max_num)
for i in range(num): _rand_copy1(x, pct)
return x
xb,_ = next(iter(dls.train))
xbt = xb[:16]
rand_copy(xbt, 0.2, 4)
show_images(xbt, imsize=1.5)
#|export
class RandCopy(nn.Module):
def __init__(self, pct=0.2, max_num=4):
super().__init__()
self.pct,self.max_num = pct,max_num
def forward(self, x): return rand_copy(x, self.pct, self.max_num)
tfms = nn.Sequential(transforms.RandomCrop(28, padding=1),
transforms.RandomHorizontalFlip(),
RandCopy())
augcb = BatchTransformCB(partial(tfm_batch, tfm_x=tfms), on_val=False)
model = get_model()
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=[DeviceCB(), SingleBatchCB(), augcb])
learn.fit(1)
xb,yb = learn.batch
show_images(xb[:16], imsize=1.5)
set_seed(1)
epochs = 25
lr = 1e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched), augcb]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
model2 = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn2 = TrainLearner(model2, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn2.fit(epochs)
mdl_path = Path('models')
torch.save(learn.model, mdl_path/'randcopy1.pkl')
torch.save(learn2.model, mdl_path/'randcopy2.pkl')
cp1 = CapturePreds()
learn.fit(1, train=False, cbs=cp1)
cp2 = CapturePreds()
learn2.fit(1, train=False, cbs=cp2)
ap = torch.stack([cp1.all_preds,cp2.all_preds]).mean(0).argmax(1)
round((ap==cp1.all_targs).float().mean().item(), 3)
p = 0.1
dist = distributions.binomial.Binomial(probs=1-p)
dist.sample((10,))
class Dropout(nn.Module):
def __init__(self, p=0.1):
super().__init__()
self.p = p
def forward(self, x):
if not self.training: return x
dist = distributions.binomial.Binomial(tensor(1.0).to(x.device), probs=1-self.p)
return x * dist.sample(x.size()) * 1/(1-self.p)
def get_dropmodel(act=nn.ReLU, nfs=(16,32,64,128,256,512), norm=nn.BatchNorm2d, drop=0.0):
layers = [ResBlock(1, 16, ks=5, stride=1, act=act, norm=norm), nn.Dropout2d(drop)]
layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
layers += [nn.Flatten(), Dropout(drop), nn.Linear(nfs[-1], 10, bias=False), nn.BatchNorm1d(10)]
return nn.Sequential(*layers)
set_seed(42)
epochs=5
lr = 1e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched)]
model = get_dropmodel(act_gr, norm=nn.BatchNorm2d, drop=0.1).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
class TTD_CB(Callback):
def before_epoch(self, learn):
learn.model.apply(lambda m: m.train() if isinstance(m, (nn.Dropout,nn.Dropout2d)) else None)
@inplace
def transformi(b): b[xl] = [(TF.to_tensor(o)*2-1) for o in b[xl]]
tds = dsd.with_transform(transformi)
dls = DataLoaders.from_dd(tds, bs, num_workers=fc.defaults.cpus)
set_seed(42)
epochs = 20
lr = 1e-2
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched), augcb]
model = get_model(act_gr, norm=nn.BatchNorm2d).apply(iw)
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)
learn.fit(epochs)
torch.save(learn.model, 'models/data_aug2.pkl')
import nbdev; nbdev.nbdev_export()