import vapoursynth as vs |
from vapoursynth import core |
is_api4 = hasattr(vs, "__api_version__") and vs.__api_version__.api_major == 4 |
def wadiqam_fr(clip1, clip2, model_folder_path, dataset="tid", top="patchwise", max_batch_size=2040): |
"""Full-reference WaDIQaM calculator for VapourSynth |
Please download the model from https://github.com/dmaniry/deepIQA/tree/master/models |
A lower score indicates better visual image quality. |
The score will be stored as frame property 'Frame_WaDIQaM_FR' in the output clip. |
args: |
clip1, clip2: RGB input clips with the same size and format. |
The width and height of clips must be multiple of 32. |
Integer clips with bit-depth other than 8, 16 are not allowed. |
The first clip will be returned. |
model_folder_path: Path to the folder that contains model's parameter files, e.g. "models". |
dataset: (str, "live" or "tid") Dataset used for traininig. |
Default is "tid". |
top: (str, "patchwise" or "weighted") Top layer and loss definition of the model. |
Default is "patchwise". |
max_batch_size: (int) Maximum size of a batch. |
The two input images are each divided into (width / 32) * (height * 32) patches. |
The memory may overflow if too many patches are fed to the model. |
Default is 2040 == (1920 / 32) * (1088 / 32). |
ref: |
[1] Bosse, S., Maniry, D., Müller, K. R., Wiegand, T., & Samek, W. (2018). |
Deep neural networks for no-reference and full-reference image quality assessment. |
IEEE Transactions on Image Processing, 27(1), 206-219. |
[2] https://github.com/dmaniry/deepIQA/ |
""" |
funcName = "wadiqam_fr" |
import numpy as np |
from numpy.lib.stride_tricks import as_strided |
import chainer |
from chainer import Variable |
import chainer.functions as F |
import chainer.links as L |
from chainer import computational_graph |
from chainer import cuda |
from chainer import optimizers |
from chainer import serializers |
from functools import partial |
import os |
xp = cuda.cupy |
cuda.cudnn_enabled = True |
cuda.check_cuda_available() |
chainer.global_config.train = False |
chainer.global_config.enable_backprop = False |
chainer.global_config.autotune = True |
chainer.global_config.type_check = True |
if not isinstance(clip1, vs.VideoNode) or clip1.format.color_family != vs.RGB: |
raise TypeError(f'{funcName}: "clip1" must be a RGB clip!') |
if not isinstance(clip2, vs.VideoNode) or clip2.format.color_family != vs.RGB: |
raise TypeError(f'{funcName}: "clip2" must be a RGB clip!') |
if clip1.width != clip2.width or clip1.height != clip2.height: |
raise TypeError(f'{funcName}: "clip2" must be of the same size as "clip1"!') |
if clip1.width % 32 != 0 or clip1.height % 32 != 0: |
raise TypeError(f'{funcName}: The width and height of clips must be multiple of 32!') |
if clip1.format.id != clip2.format.id: |
raise TypeError(f'{funcName}: "clip2" must be of the same format as "clip1"!') |
if clip1.format.sample_type == vs.INTEGER and clip1.format.bits_per_sample not in [8, 16]: |
raise TypeError(f'{funcName}: Integer clips with bit-depth other than 8, 16 are not allowed!') |
class FRModel(chainer.Chain): |
def __init__(self, top="patchwise"): |
super(FRModel, self).__init__( |
conv1 = L.Convolution2D(3, 32, 3, pad=1), |
conv2 = L.Convolution2D(32, 32, 3, pad=1), |
conv3 = L.Convolution2D(32, 64, 3, pad=1), |
conv4 = L.Convolution2D(64, 64, 3, pad=1), |
conv5 = L.Convolution2D(64, 128, 3, pad=1), |
conv6 = L.Convolution2D(128, 128, 3, pad=1), |
conv7 = L.Convolution2D(128, 256, 3, pad=1), |
conv8 = L.Convolution2D(256, 256, 3, pad=1), |
conv9 = L.Convolution2D(256, 512, 3, pad=1), |
conv10 = L.Convolution2D(512, 512, 3, pad=1), |
fc1 = L.Linear(512 * 3, 512), |
fc2 = L.Linear(512, 1) |
) |
self.top = top |
if top == "weighted": |
fc1_a = L.Linear(512 * 3, 512) |
fc2_a = L.Linear(512, 1) |
self.add_link("fc1_a", fc1_a) |
self.add_link("fc2_a", fc2_a) |
def extract_features(self, x, train=True): |
h = F.relu(self.conv1(x)) |
h = F.relu(self.conv2(h)) |
self.h1 = h |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv3(h)) |
h = F.relu(self.conv4(h)) |
self.h2 = h |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv5(h)) |
h = F.relu(self.conv6(h)) |
self.h3 = h |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv7(h)) |
h = F.relu(self.conv8(h)) |
self.h4 = h |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv9(h)) |
h = F.relu(self.conv10(h)) |
self.h5 = h |
h = F.max_pooling_2d(h, 2) |
return h |
def forward(self, x_data, x_ref_data, y_data, train=True, |
n_patches_per_image=32): |
if not isinstance(x_data, Variable): |
x = Variable(x_data) |
else: |
x = x_data |
x_data = x.data |
self.n_images = y_data.shape[0] |
self.n_patches = x_data.shape[0] |
self.n_patches_per_image = n_patches_per_image |
x_ref = Variable(x_ref_data) |
h = self.extract_features(x) |
self.h = h |
h_ref = self.extract_features(x_ref) |
h = F.concat((h-h_ref, h, h_ref)) |
h_ = h # save intermediate features |
h = F.dropout(F.relu(self.fc1(h)), ratio=0.5) |
h = self.fc2(h) |
if self.top == "weighted": |
a = F.dropout(F.relu(self.fc1_a(h_)), ratio=0.5) |
a = F.relu(self.fc2_a(a)) + 0.000001 |
t = Variable(y_data) |
self.weighted_loss(h, a, t) |
elif self.top == "patchwise": |
a = Variable(xp.ones_like(h.data)) |
t = Variable(xp.repeat(y_data, n_patches_per_image)) |
self.patchwise_loss(h, a, t) |
if train: |
return self.loss |
else: |
return self.loss, self.y |
def patchwise_loss(self, h, a, t): |
self.loss = F.sum(abs(h - F.reshape(t, (-1, 1)))) |
self.loss /= self.n_patches |
if self.n_images > 1: |
h = F.split_axis(h, self.n_images, 0) |
a = F.split_axis(a, self.n_images, 0) |
else: |
h, a = [h], [a] |
self.y = h |
self.a = a |
def weighted_loss(self, h, a, t): |
self.loss = 0 |
if self.n_images > 1: |
h = F.split_axis(h, self.n_images, 0) |
a = F.split_axis(a, self.n_images, 0) |
t = F.split_axis(t, self.n_images, 0) |
else: |
h, a, t = [h], [a], [t] |
for i in range(self.n_images): |
y = F.sum(h[i] * a[i], 0) / F.sum(a[i], 0) |
self.loss += abs(y - F.reshape(t[i], (1, ))) |
self.loss /= self.n_images |
self.y = h |
self.a = a |
def extract_patches(arr, patch_shape=(32, 32, 3), extraction_step=32): |
extraction_step = [extraction_step] * 3 |
patch_strides = arr.strides |
slices = tuple(slice(None, None, st) for st in extraction_step) |
indexing_strides = arr[slices].strides |
patch_indices_shape = ((np.array(arr.shape) - np.array(patch_shape)) // |
np.array(extraction_step)) + 1 |
shape = tuple(list(patch_indices_shape) + list(patch_shape)) |
strides = tuple(list(indexing_strides) + list(patch_strides)) |
patches = as_strided(arr, shape=shape, strides=strides) |
return patches |
def benchmark(n, f, model, max_batch_size=2040): |
fout = f[0].copy() |
planes = f[0].format.num_planes |
if is_api4: |
img1 = np.stack(f[0], axis=2) |
else: |
img1 = np.stack([f[0].get_read_array(i) for i in range(planes)], axis=2) |
img1_patches = np.transpose(extract_patches(img1).reshape((-1, 32, 32, 3)), (0, 3, 1, 2)) |
if is_api4: |
img2 = np.stack(f[1], axis=2) |
else: |
img2 = np.stack([f[1].get_read_array(i) for i in range(planes)], axis=2) |
img2_patches = np.transpose(extract_patches(img2).reshape((-1, 32, 32, 3)), (0, 3, 1, 2)) |
if img1.dtype == np.uint8: |
img1_patches = xp.array(img1_patches.astype(np.float32)) |
img2_patches = xp.array(img2_patches.astype(np.float32)) |
elif img1.dtype == np.uint16: |
img1_patches = xp.array(img1_patches.astype(np.float32) * np.float32(255 / 65535)) |
img2_patches = xp.array(img2_patches.astype(np.float32) * np.float32(255 / 65535)) |
elif img1.dtype in [np.float16, np.float32, np.float64, np.float_]: |
img1_patches = xp.array(img1_patches.astype(np.float32) * np.float32(255)) |
img2_patches = xp.array(img2_patches.astype(np.float32) * np.float32(255)) |
else: |
raise TypeError("benchmark: unknown dtype.") |
t = xp.zeros((1, 1), dtype=np.float32) |
y = [] |
weights = [] |
for i in range(0, img1_patches.shape[0], max_batch_size): |
img1_batch = img1_patches[i:min(i + max_batch_size, img1_patches.shape[0])] |
img2_batch = img2_patches[i:min(i + max_batch_size, img2_patches.shape[0])] |
model.forward(img1_batch, img2_batch, t, False, n_patches_per_image=img1_batch.shape[0]) |
y.append(xp.asnumpy(model.y[0].data)) |
weights.append(xp.asnumpy(model.a[0].data)) |
y = np.concatenate(y) |
weights = np.concatenate(weights) |
score = np.sum(y * weights) / np.sum(weights) |
fout.props['Frame_WaDIQaM_FR'] = np.float64(score) |
return fout |
model = FRModel(top=top) |
model_path = os.path.join(model_folder_path, f"fr_{dataset}_{top}.model") |
serializers.load_hdf5(model_path, model) |
model.to_gpu() |
return core.std.ModifyFrame(clip1, clips=[clip1, clip2], |
selector=partial(benchmark, model=model, max_batch_size=max_batch_size)) |
def wadiqam_nr(clip, model_folder_path, dataset="tid", top="patchwise", max_batch_size=2040): |
"""No-reference WaDIQaM calculator for VapourSynth |
Please download the model from https://github.com/dmaniry/deepIQA/tree/master/models |
A lower score indicates better visual image quality. |
The score will be stored as frame property 'Frame_WaDIQaM_NR' in the output clip. |
args: |
clip: RGB input clip. |
Integer clips with bit-depth other than 8, 16 are not allowed. |
The width and height of clips must be multiple of 32. |
model_folder_path: Path to the folder that contains model's parameter files, e.g. "models". |
dataset: (str, "live" or "tid") Dataset used for traininig. |
Default is "tid". |
top: (str, "patchwise" or "weighted") Top layer and loss definition of the model. |
Default is "patchwise". |
max_batch_size: (int) Maximum size of a batch. |
The input image is each divided into (width / 32) * (height * 32) patches. |
The memory may overflow if too many patches are fed to the model. |
Default is 2040 == (1920 / 32) * (1088 / 32). |
ref: |
[1] Bosse, S., Maniry, D., Müller, K. R., Wiegand, T., & Samek, W. (2018). |
Deep neural networks for no-reference and full-reference image quality assessment. |
IEEE Transactions on Image Processing, 27(1), 206-219. |
[2] https://github.com/dmaniry/deepIQA/ |
""" |
funcName = "wadiqam_nr" |
import numpy as np |
from numpy.lib.stride_tricks import as_strided |
import chainer |
from chainer import Variable |
import chainer.functions as F |
import chainer.links as L |
from chainer import computational_graph |
from chainer import cuda |
from chainer import optimizers |
from chainer import serializers |
from functools import partial |
import os |
xp = cuda.cupy |
cuda.cudnn_enabled = True |
cuda.check_cuda_available() |
chainer.global_config.train = False |
chainer.global_config.enable_backprop = False |
chainer.global_config.autotune = True |
chainer.global_config.type_check = True |
if not isinstance(clip, vs.VideoNode) or clip.format.color_family != vs.RGB: |
raise TypeError(f'{funcName}: "clip" must be a RGB clip!') |
if clip.width % 32 != 0 or clip.height % 32 != 0: |
raise TypeError(f'{funcName}: The width and height of "clip" must be multiple of 32!') |
if clip.format.sample_type == vs.INTEGER and clip.format.bits_per_sample not in [8, 16]: |
raise TypeError(f'{funcName}: Integer clips with bit-depth other than 8, 16 are not allowed!') |
class NRModel(chainer.Chain): |
def __init__(self, top="patchwise"): |
super(NRModel, self).__init__( |
conv1 = L.Convolution2D(3, 32, 3, pad=1), |
conv2 = L.Convolution2D(32, 32, 3, pad=1), |
conv3 = L.Convolution2D(32, 64, 3, pad=1), |
conv4 = L.Convolution2D(64, 64, 3, pad=1), |
conv5 = L.Convolution2D(64, 128, 3, pad=1), |
conv6 = L.Convolution2D(128, 128, 3, pad=1), |
conv7 = L.Convolution2D(128, 256, 3, pad=1), |
conv8 = L.Convolution2D(256, 256, 3, pad=1), |
conv9 = L.Convolution2D(256, 512, 3, pad=1), |
conv10 = L.Convolution2D(512, 512, 3, pad=1), |
fc1 = L.Linear(512, 512), |
fc2 = L.Linear(512, 1) |
) |
self.top = top |
if top == "weighted": |
fc1_a = L.Linear(512, 512) |
fc2_a = L.Linear(512, 1) |
self.add_link("fc1_a", fc1_a) |
self.add_link("fc2_a", fc2_a) |
def forward(self, x_data, y_data, train=True, n_patches=32): |
if not isinstance(x_data, Variable): |
x = Variable(x_data) |
else: |
x = x_data |
x_data = x.data |
self.n_images = y_data.shape[0] |
self.n_patches = x_data.shape[0] |
self.n_patches_per_image = self.n_patches / self.n_images |
h = F.relu(self.conv1(x)) |
h = F.relu(self.conv2(h)) |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv3(h)) |
h = F.relu(self.conv4(h)) |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv5(h)) |
h = F.relu(self.conv6(h)) |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv7(h)) |
h = F.relu(self.conv8(h)) |
h = F.max_pooling_2d(h, 2) |
h = F.relu(self.conv9(h)) |
h = F.relu(self.conv10(h)) |
h = F.max_pooling_2d(h, 2) |
h_ = h |
self.h = h_ |
h = F.dropout(F.relu(self.fc1(h_)), ratio=0.5) |
h = self.fc2(h) |
if self.top == "weighted": |
a = F.dropout(F.relu(self.fc1_a(h_)), ratio=0.5) |
a = F.relu(self.fc2_a(a)) + 0.000001 |
t = Variable(y_data) |
self.weighted_loss(h, a, t) |
elif self.top == "patchwise": |
a = Variable(xp.ones_like(h.data)) |
t = Variable(xp.repeat(y_data, n_patches)) |
self.patchwise_loss(h, a, t) |
if train: |
return self.loss |
else: |
return self.loss, self.y |
def patchwise_loss(self, h, a, t): |
self.loss = F.sum(abs(h - F.reshape(t, (-1, 1)))) |
self.loss /= self.n_patches |
if self.n_images > 1: |
h = F.split_axis(h, self.n_images, 0) |
a = F.split_axis(a, self.n_images, 0) |
else: |
h, a = [h], [a] |
self.y = h |
self.a = a |
def weighted_loss(self, h, a, t): |
self.loss = 0 |
if self.n_images > 1: |
h = F.split_axis(h, self.n_images, 0) |
a = F.split_axis(a, self.n_images, 0) |
t = F.split_axis(t, self.n_images, 0) |
else: |
h, a, t = [h], [a], [t] |
for i in range(self.n_images): |
y = F.sum(h[i] * a[i], 0) / F.sum(a[i], 0) |
self.loss += abs(y - F.reshape(t[i], (1, ))) |
self.loss /= self.n_images |
self.y = h |
self.a = a |
def extract_patches(arr, patch_shape=(32, 32, 3), extraction_step=32): |
extraction_step = [extraction_step] * 3 |
patch_strides = arr.strides |
slices = tuple(slice(None, None, st) for st in extraction_step) |
indexing_strides = arr[slices].strides |
patch_indices_shape = ((np.array(arr.shape) - np.array(patch_shape)) // |
np.array(extraction_step)) + 1 |
shape = tuple(list(patch_indices_shape) + list(patch_shape)) |
strides = tuple(list(indexing_strides) + list(patch_strides)) |
patches = as_strided(arr, shape=shape, strides=strides) |
return patches |
def benchmark(n, f, model, max_batch_size=2040): |
fout = f.copy() |
if is_api4: |
img1 = np.stack(f, axis=2) |
else: |
planes = f.format.num_planes |
img1 = np.stack([f.get_read_array(i) for i in range(planes)], axis=2) |
img1_patches = np.transpose(extract_patches(img1).reshape((-1, 32, 32, 3)), (0, 3, 1, 2)) |
if img1.dtype == np.uint8: |
img1_patches = xp.array(img1_patches.astype(np.float32)) |
elif img1.dtype == np.uint16: |
img1_patches = xp.array(img1_patches.astype(np.float32) * np.float32(255 / 65535)) |
elif img1.dtype in [np.float16, np.float32, np.float64, np.float_]: |
img1_patches = xp.array(img1_patches.astype(np.float32) * np.float32(255)) |
else: |
raise TypeError("benchmark: unknown dtype.") |
t = xp.zeros((1, 1), dtype=np.float32) |
y = [] |
weights = [] |
for i in range(0, img1_patches.shape[0], max_batch_size): |
img1_batch = img1_patches[i:min(i + max_batch_size, img1_patches.shape[0])] |
model.forward(img1_batch, t, False, n_patches=img1_batch.shape[0]) |
y.append(xp.asnumpy(model.y[0].data)) |
weights.append(xp.asnumpy(model.a[0].data)) |
y = np.concatenate(y) |
weights = np.concatenate(weights) |
score = np.sum(y * weights) / np.sum(weights) |
fout.props['Frame_WaDIQaM_NR'] = np.float64(score) |
return fout |
model = NRModel(top=top) |
model_path = os.path.join(model_folder_path, f"nr_{dataset}_{top}.model") |
serializers.load_hdf5(model_path, model) |
model.to_gpu() |
return core.std.ModifyFrame(clip, clips=clip, |
selector=partial(benchmark, model=model, max_batch_size=max_batch_size)) |