Created March 15, 2018 13:23
Splice in Neural Enhance upscaling into the dfaker merge process.
#!/usr/bin/env python3
""" _ _
_ __ ___ _ _ _ __ __ _| | ___ _ __ | |__ __ _ _ __ ___ ___
| '_ \ / _ \ | | | '__/ _` | | / _ \ '_ \| '_ \ / _` | '_ \ / __/ _ \
| | | | __/ |_| | | | (_| | | | __/ | | | | | | (_| | | | | (_| __/
|_| |_|\___|\__,_|_| \__,_|_| \___|_| |_|_| |_|\__,_|_| |_|\___\___|
# Original work Copyright (c) 2016, Alex J. Champandard.
# Modified work Copyright (c) 2018, Alexis_TheLarge.
# Neural Enhance is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License version 3. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See full license here:
__version__ = '0.3'
import io
import os
import sys
import bz2
import glob
import math
import time
import pickle
import random
import argparse
import itertools
import threading
import collections
import cv2
import numpy as np
from PIL import Image
files = ['images/128.jpg']
i_type = 'photo'
model = 'default'
train = False
train_scales = 0
train_blur = None
train_noise = None
train_jpeg = []
eopchs = 10
eopch_size = 72
save_every = 10
batch_shape = 192
batch_size = 15
buffer_size = 1500
buffer_fraction = 5
learning_rate = 1E-4
learning_period = 75
learning_decay = 0.5
generator_upscale = 2
generator_downscale = 0
generator_filters = [64]
generator_blocks = 4
generator_residual = 2
perceptual_layer = 'conv2_2'
perceptual_weight = 1e0
discriminator_size = 32
smoothness_weight = 2e5
adversary_weight = 5e2
generator_start = 0
discriminator_start = 1
adversary_start = 2
device = 'cuda'
# Color coded output helps visualize the information a little better, plus it looks cool!
class ansi:
WHITE = '\033[0;97m'
WHITE_B = '\033[1;97m'
YELLOW = '\033[0;33m'
YELLOW_B = '\033[1;33m'
RED = '\033[0;31m'
RED_B = '\033[1;31m'
BLUE = '\033[0;94m'
BLUE_B = '\033[1;94m'
CYAN = '\033[0;36m'
CYAN_B = '\033[1;36m'
ENDC = '\033[0m'
def error(message, *lines):
string = "\n{}ERROR: " + message + "{}\n" + "\n".join(lines) + ("{}\n" if lines else "{}")
print(string.format(ansi.RED_B, ansi.RED, ansi.ENDC))
def warn(message, *lines):
string = "\n{}WARNING: " + message + "{}\n" + "\n".join(lines) + "{}\n"
print(string.format(ansi.YELLOW_B, ansi.YELLOW, ansi.ENDC))
def extend(lst): return itertools.chain(lst, itertools.repeat(lst[-1]))
# Load the underlying deep learning libraries based on the device specified. If you specify THEANO_FLAGS manually,
# the code assumes you know what you are doing and they are not overriden!
os.environ.setdefault('THEANO_FLAGS', 'floatX=float32,device={},force_device=True,allow_gc=True,'\
# Scientific & Imaging Libraries
import numpy as np
import scipy.ndimage, scipy.misc, PIL.Image
# Numeric Computing (GPU)
import theano, theano.tensor as T
T.nnet.softminus = lambda x: x - T.nnet.softplus(x)
# Support ansi colors in Windows too.
if sys.platform == 'win32':
import colorama
# Deep Learning Framework
import lasagne
from lasagne.layers import Conv2DLayer as ConvLayer, Deconv2DLayer as DeconvLayer, Pool2DLayer as PoolLayer
from lasagne.layers import InputLayer, ConcatLayer, ElemwiseSumLayer, batch_norm
print('{} - Using the device `{}` for neural computation.{}\n'.format(ansi.CYAN, theano.config.device, ansi.ENDC))
# Image Processing
class DataLoader(threading.Thread):
def __init__(self, zoom):
super(DataLoader, self).__init__(daemon=True)
self.data_ready = threading.Event()
self.data_copied = threading.Event()
self.zoom = zoom
self.orig_shape, self.seed_shape = batch_shape, batch_shape // self.zoom
self.orig_buffer = np.zeros((buffer_size, 3, self.orig_shape, self.orig_shape), dtype=np.float32)
self.seed_buffer = np.zeros((buffer_size, 3, self.seed_shape, self.seed_shape), dtype=np.float32)
self.files = glob.glob(train)
if len(self.files) == 0:
error("There were no files found to train from searching for `{}`".format(train),
" - Try putting all your images in one folder and using `--train=data/*.jpg`")
self.available = set(range(buffer_size))
self.ready = set()
self.cwd = os.getcwd()
def run(self):
while True:
for f in self.files:
def add_to_buffer(self, f):
filename = os.path.join(self.cwd, f)
orig ='RGB')
scale = 2 ** random.randint(0, train_scales)
if scale > 1 and all(s//scale >= batch_shape for s in orig.size):
orig = orig.resize((orig.size[0]//scale, orig.size[1]//scale), resample=PIL.Image.LANCZOS)
if any(s < batch_shape for s in orig.size):
raise ValueError('Image is too small for training with size {}'.format(orig.size))
except Exception as e:
warn('Could not load `{}` as image.'.format(filename),
' - Try fixing or removing the file before next run.')
seed = orig
if train_blur is not None:
seed = seed.filter(PIL.ImageFilter.GaussianBlur(radius=random.randint(0, args.train_blur*2)))
if self.zoom > 1:
seed = seed.resize((orig.size[0]//self.zoom, orig.size[1]//self.zoom), resample=PIL.Image.LANCZOS)
if len(train_jpeg) > 0:
buffer, rng = io.BytesIO(), train_jpeg[-1] if len(train_jpeg) > 1 else 15, format='jpeg', quality=train_jpeg[0]+random.randrange(-rng, +rng))
seed =
orig = scipy.misc.fromimage(orig).astype(np.float32)
seed = scipy.misc.fromimage(seed).astype(np.float32)
if train_noise is not None:
seed += scipy.random.normal(scale=train_noise, size=(seed.shape[0], seed.shape[1], 1))
for _ in range(seed.shape[0] * seed.shape[1] // (buffer_fraction * self.seed_shape ** 2)):
h = random.randint(0, seed.shape[0] - self.seed_shape)
w = random.randint(0, seed.shape[1] - self.seed_shape)
seed_chunk = seed[h:h+self.seed_shape, w:w+self.seed_shape]
h, w = h * self.zoom, w * self.zoom
orig_chunk = orig[h:h+self.orig_shape, w:w+self.orig_shape]
while len(self.available) == 0:
i = self.available.pop()
self.orig_buffer[i] = np.transpose(orig_chunk.astype(np.float32) / 255.0 - 0.5, (2, 0, 1))
self.seed_buffer[i] = np.transpose(seed_chunk.astype(np.float32) / 255.0 - 0.5, (2, 0, 1))
if len(self.ready) >= batch_size:
def copy(self, origs_out, seeds_out):
for i, j in enumerate(random.sample(self.ready, batch_size)):
origs_out[i] = self.orig_buffer[j]
seeds_out[i] = self.seed_buffer[j]
# Convolution Networks
class SubpixelReshuffleLayer(lasagne.layers.Layer):
"""Based on the code by ajbrock:
def __init__(self, incoming, channels, upscale, **kwargs):
super(SubpixelReshuffleLayer, self).__init__(incoming, **kwargs)
self.upscale = upscale
self.channels = channels
def get_output_shape_for(self, input_shape):
def up(d): return self.upscale * d if d else d
return (input_shape[0], self.channels, up(input_shape[2]), up(input_shape[3]))
def get_output_for(self, input, deterministic=False, **kwargs):
out, r = T.zeros(self.get_output_shape_for(input.shape)), self.upscale
for y, x in itertools.product(range(r), repeat=2):
out=T.inc_subtensor(out[:,:,y::r,x::r], input[:,r*y+x::r*r,:,:])
return out
class Model(object):
def __init__(self, zoom, model): = collections.OrderedDict()['img'] = InputLayer((None, 3, None, None))['seed'] = InputLayer((None, 3, None, None))
self.zoom = zoom
self.model = model
config, params = self.load_model()
self.config = config
self.setup_generator(self.last_layer(), config)
if train:
concatenated = lasagne.layers.ConcatLayer([['img'],['out']], axis=0)
def get_config(self):
return self.config
# Network Configuration
def last_layer(self):
return list([-1]
def make_layer(self, name, input, units, filter_size=(3,3), stride=(1,1), pad=(1,1), alpha=0.25):
conv = ConvLayer(input, units, filter_size, stride=stride, pad=pad, nonlinearity=None)
prelu = lasagne.layers.ParametricRectifierLayer(conv, alpha=lasagne.init.Constant(alpha))[name+'x'] = conv[name+'>'] = prelu
return prelu
def make_block(self, name, input, units):
self.make_layer(name+'-A', input, units, alpha=0.1)
# self.make_layer(name+'-B', self.last_layer(), units, alpha=1.0)
return ElemwiseSumLayer([input, self.last_layer()]) if self.generator_residual else self.last_layer()
def setup_generator(self, input, config):
#for k, v in config.items(): setattr(args, k, v)
self.generator_upscale = config['generator_upscale']
self.generator_downscale = config['generator_downscale']
self.generator_filters = config['generator_filters']
self.generator_blocks = config['generator_blocks']
self.generator_residual = config['generator_residual']
self.zoom = 2**(self.generator_upscale - self.generator_downscale)
units_iter = extend(self.generator_filters)
units = next(units_iter)
self.make_layer('iter.0', input, units, filter_size=(7,7), pad=(3,3))
for i in range(0, self.generator_downscale):
self.make_layer('downscale%i'%i, self.last_layer(), next(units_iter), filter_size=(4,4), stride=(2,2))
units = next(units_iter)
for i in range(0, self.generator_blocks):
self.make_block('iter.%i'%(i+1), self.last_layer(), units)
for i in range(0,self. generator_upscale):
u = next(units_iter)
self.make_layer('upscale%i.2'%i, self.last_layer(), u*4)['upscale%i.1'%i] = SubpixelReshuffleLayer(self.last_layer(), u, 2)['out'] = ConvLayer(self.last_layer(), 3, filter_size=(7,7), pad=(3,3), nonlinearity=None)
def setup_perceptual(self, input):
"""Use lasagne to create a network of convolution layers using pre-trained VGG19 weights.
offset = np.array([103.939, 116.779, 123.680], dtype=np.float32).reshape((1,3,1,1))['percept'] = lasagne.layers.NonlinearityLayer(input, lambda x: ((x+0.5)*255.0) - offset)['mse'] =['percept']['conv1_1'] = ConvLayer(['percept'], 64, 3, pad=1)['conv1_2'] = ConvLayer(['conv1_1'], 64, 3, pad=1)['pool1'] = PoolLayer(['conv1_2'], 2, mode='max')['conv2_1'] = ConvLayer(['pool1'], 128, 3, pad=1)['conv2_2'] = ConvLayer(['conv2_1'], 128, 3, pad=1)['pool2'] = PoolLayer(['conv2_2'], 2, mode='max')['conv3_1'] = ConvLayer(['pool2'], 256, 3, pad=1)['conv3_2'] = ConvLayer(['conv3_1'], 256, 3, pad=1)['conv3_3'] = ConvLayer(['conv3_2'], 256, 3, pad=1)['conv3_4'] = ConvLayer(['conv3_3'], 256, 3, pad=1)['pool3'] = PoolLayer(['conv3_4'], 2, mode='max')['conv4_1'] = ConvLayer(['pool3'], 512, 3, pad=1)['conv4_2'] = ConvLayer(['conv4_1'], 512, 3, pad=1)['conv4_3'] = ConvLayer(['conv4_2'], 512, 3, pad=1)['conv4_4'] = ConvLayer(['conv4_3'], 512, 3, pad=1)['pool4'] = PoolLayer(['conv4_4'], 2, mode='max')['conv5_1'] = ConvLayer(['pool4'], 512, 3, pad=1)['conv5_2'] = ConvLayer(['conv5_1'], 512, 3, pad=1)['conv5_3'] = ConvLayer(['conv5_2'], 512, 3, pad=1)['conv5_4'] = ConvLayer(['conv5_3'], 512, 3, pad=1)
def setup_discriminator(self):
c = discriminator_size
self.make_layer('disc1.1', batch_norm(['conv1_2']), 1*c, filter_size=(5,5), stride=(2,2), pad=(2,2))
self.make_layer('disc1.2', self.last_layer(), 1*c, filter_size=(5,5), stride=(2,2), pad=(2,2))
self.make_layer('disc2', batch_norm(['conv2_2']), 2*c, filter_size=(5,5), stride=(2,2), pad=(2,2))
self.make_layer('disc3', batch_norm(['conv3_2']), 3*c, filter_size=(3,3), stride=(1,1), pad=(1,1))
hypercolumn = ConcatLayer([['disc1.2>'],['disc2>'],['disc3>']])
self.make_layer('disc4', hypercolumn, 4*c, filter_size=(1,1), stride=(1,1), pad=(0,0))
self.make_layer('disc5', self.last_layer(), 3*c, filter_size=(3,3), stride=(2,2))
self.make_layer('disc6', self.last_layer(), 2*c, filter_size=(1,1), stride=(1,1), pad=(0,0))['disc'] = batch_norm(ConvLayer(self.last_layer(), 1, filter_size=(1,1),
# Input / Output
def load_perceptual(self):
"""Open the serialized parameters from a pre-trained network, and load them into the model created.
vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
if not os.path.exists(vgg19_file):
error("Model file with pre-trained convolution layers not found. Download here...",
data = pickle.load(, 'rb'))
layers = lasagne.layers.get_all_layers(self.last_layer(), treat_as_input=[['percept']])
for p, d in zip(itertools.chain(*[l.get_params() for l in layers]), data): p.set_value(d)
def list_generator_layers(self):
for l in lasagne.layers.get_all_layers(['out'], treat_as_input=[['img']]):
if not l.get_params(): continue
name = list([list(]
yield (name, l)
def get_filename(self, absolute=False):
filename = 'ne%ix-%s-%s-%s.pkl.bz2' % (self.zoom, i_type, self.model, __version__)
return os.path.join(os.path.dirname(__file__), filename) if absolute else filename
def save_generator(self):
def cast(p): return p.get_value().astype(np.float16)
params = {k: [cast(p) for p in l.get_params()] for (k, l) in self.list_generator_layers()}
#config = {k: getattr(args, k) for k in ['generator_blocks', 'generator_residual', 'generator_filters'] + \
# ['generator_upscale', 'generator_downscale']}
config = {}
config['generator_upscale'] = self.generator_upscale
config['generator_downscale'] = self.generator_downscale
config['generator_filters'] = self.generator_filters
config['generator_blocks'] = self.generator_blocks
config['generator_residual'] = self.generator_residual
pickle.dump((config, params),, 'wb'))
print(' - Saved model as `{}` after training.'.format(self.get_filename()))
def load_model(self):
if not os.path.exists(self.get_filename(absolute=True)):
if train: return {}, {}
error("Model file with pre-trained convolution layers not found. Download it here...",
""%(__version__, self.get_filename()))
print(' - Loaded file `{}` with trained model.'.format(self.get_filename()))
return pickle.load(, 'rb'))
def load_generator(self, params):
if len(params) == 0: return
for k, l in self.list_generator_layers():
assert k in params, "Couldn't find layer `%s` in loaded model.'" % k
assert len(l.get_params()) == len(params[k]), "Mismatch in types of layers."
for p, v in zip(l.get_params(), params[k]):
assert v.shape == p.get_value().shape, "Mismatch in number of parameters for layer {}.".format(k)
# Training & Loss Functions
def loss_perceptual(self, p):
return lasagne.objectives.squared_error(p[:batch_size], p[batch_size:]).mean()
def loss_total_variation(self, x):
return T.mean(((x[:,:,:-1,:-1] - x[:,:,1:,:-1])**2 + (x[:,:,:-1,:-1] - x[:,:,:-1,1:])**2)**1.25)
def loss_adversarial(self, d):
return T.mean(1.0 - T.nnet.softminus(d[batch_size:]))
def loss_discriminator(self, d):
return T.mean(T.nnet.softminus(d[batch_size:]) - T.nnet.softplus(d[:batch_size]))
def compile(self):
# Helper function for rendering test images during training, or standalone inference mode.
input_tensor, seed_tensor = T.tensor4(), T.tensor4()
input_layers = {['img']: input_tensor,['seed']: seed_tensor}
output = lasagne.layers.get_output([[k] for k in ['seed','out']], input_layers, deterministic=True)
self.predict = theano.function([seed_tensor], output)
if not train: return
output_layers = [['out'],[perceptual_layer],['disc']]
gen_out, percept_out, disc_out = lasagne.layers.get_output(output_layers, input_layers, deterministic=False)
# Generator loss function, parameters and updates.
self.gen_lr = theano.shared(np.array(0.0, dtype=theano.config.floatX))
self.adversary_weight = theano.shared(np.array(0.0, dtype=theano.config.floatX))
gen_losses = [self.loss_perceptual(percept_out) * perceptual_weight,
self.loss_total_variation(gen_out) * smoothness_weight,
self.loss_adversarial(disc_out) * self.adversary_weight]
gen_params = lasagne.layers.get_all_params(['out'], trainable=True)
print(' - {} tensors learned for generator.'.format(len(gen_params)))
gen_updates = lasagne.updates.adam(sum(gen_losses, 0.0), gen_params, learning_rate=self.gen_lr)
# Discriminator loss function, parameters and updates.
self.disc_lr = theano.shared(np.array(0.0, dtype=theano.config.floatX))
disc_losses = [self.loss_discriminator(disc_out)]
disc_params = list(itertools.chain(*[l.get_params() for k, l in if 'disc' in k]))
print(' - {} tensors learned for discriminator.'.format(len(disc_params)))
grads = [g.clip(-5.0, +5.0) for g in T.grad(sum(disc_losses, 0.0), disc_params)]
disc_updates = lasagne.updates.adam(grads, disc_params, learning_rate=self.disc_lr)
# Combined Theano function for updating both generator and discriminator at the same time.
updates = collections.OrderedDict(list(gen_updates.items()) + list(disc_updates.items())) = theano.function([input_tensor, seed_tensor], gen_losses + [disc_out.mean(axis=(1,2,3))], updates=updates)
class NeuralEnhancer(object):
def __init__(self, model_type, zoom, loader):
if train:
print('{}Training {} epochs on random image sections with batch size {}.{}'\
.format(ansi.BLUE_B, args.epochs, args.batch_size, ansi.BLUE))
if len(files) == 0: error("Specify the image(s) to enhance on the command-line.")
#print('{}Enhancing {} image(s) specified on the command-line.{}'\
# .format(ansi.BLUE_B, len(files), ansi.BLUE))
self.zoom = zoom
self.model_type = model_type
self.thread = DataLoader(self.zoom) if loader else None
self.model = Model(self.zoom, self.model_type)
config = self.model.get_config()
self.generator_upscale = config['generator_upscale']
self.generator_downscale = config['generator_downscale']
def imsave(self, fn, img):
scipy.misc.toimage(np.transpose(img + 0.5, (1, 2, 0)).clip(0.0, 1.0) * 255.0, cmin=0, cmax=255).save(fn)
def show_progress(self, orign, scald, repro):
os.makedirs('valid', exist_ok=True)
for i in range(batch_size):
self.imsave('valid/%s_%03i_origin.png' % (self.model_type, i), orign[i])
self.imsave('valid/%s_%03i_pixels.png' % (self.model_type, i), scald[i])
self.imsave('valid/%s_%03i_reprod.png' % (self.model_type, i), repro[i])
def decay_learning_rate(self):
l_r, t_cur = learning_rate, 0
while True:
yield l_r
t_cur += 1
if t_cur % learning_period == 0: l_r *= learning_decay
def train(self):
seed_size = batch_shape // self.zoom
images = np.zeros((batch_size, 3, batch_shape, batch_shape), dtype=np.float32)
seeds = np.zeros((batch_size, 3, seed_size, seed_size), dtype=np.float32)
learning_rate = self.decay_learning_rate()
average, start = None, time.time()
for epoch in range(epochs):
total, stats = None, None
l_r = next(learning_rate)
if epoch >= generator_start: self.model.gen_lr.set_value(l_r)
if epoch >= discriminator_start: self.model.disc_lr.set_value(l_r)
for _ in range(epoch_size):
self.thread.copy(images, seeds)
output =, seeds)
losses = np.array(output[:3], dtype=np.float32)
stats = (stats + output[3]) if stats is not None else output[3]
total = total + losses if total is not None else losses
l = np.sum(losses)
assert not np.isnan(losses).any()
average = l if average is None else average * 0.95 + 0.05 * l
print('↑' if l > average else '↓', end='', flush=True)
scald, repro = self.model.predict(seeds)
self.show_progress(images, scald, repro)
total /= epoch_size
stats /= epoch_size
totals, labels = [sum(total)] + list(total), ['total', 'prcpt', 'smthn', 'advrs']
gen_info = ['{}{}{}={:4.2e}'.format(ansi.WHITE_B, k, ansi.ENDC, v) for k, v in zip(labels, totals)]
print('\rEpoch #{} at {:4.1f}s, lr={:4.2e}{}'.format(epoch+1, time.time()-start, l_r, ' '*(epoch_size-30)))
print(' - generator {}'.format(' '.join(gen_info)))
real, fake = stats[:batch_size], stats[batch_size:]
print(' - discriminator', real.mean(), len(np.where(real > 0.5)[0]),
fake.mean(), len(np.where(fake < -0.5)[0]))
if epoch == adversarial_start-1:
print(' - generator now optimizing against discriminator.')
running = None
if (epoch+1) % save_every == 0:
print(' - saving current generator layers to disk...')
except KeyboardInterrupt:
print('\n{}Trained {}x super-resolution for {} epochs.{}'\
.format(ansi.CYAN_B, self.zoom, epoch+1, ansi.CYAN))
def match_histograms(self, A, B, rng=(0.0, 255.0), bins=64):
(Ha, Xa), (Hb, Xb) = [np.histogram(i, bins=bins, range=rng, density=True) for i in [A, B]]
X = np.linspace(rng[0], rng[1], bins, endpoint=True)
Hpa, Hpb = [np.cumsum(i) * (rng[1] - rng[0]) ** 2 / float(bins) for i in [Ha, Hb]]
inv_Ha = scipy.interpolate.interp1d(X, Hpa, bounds_error=False, fill_value='extrapolate')
map_Hb = scipy.interpolate.interp1d(Hpb, X, bounds_error=False, fill_value='extrapolate')
return map_Hb(inv_Ha(A).clip(0.0, 255.0))
def process(self, original):
# Snap the image to a shape that's compatible with the generator (2x, 4x)
s = 2 ** max(self.generator_upscale, self.generator_downscale)
by, bx = original.shape[0] % s, original.shape[1] % s
original = original[by-by//2:original.shape[0]-by//2,bx-bx//2:original.shape[1]-bx//2,:]
# Prepare paded input image as well as output buffer of zoomed size.
s, p, z = rendering_tile, rendering_overlap, self.zoom
image = np.pad(original, ((p, p), (p, p), (0, 0)), mode='reflect')
output = np.zeros((original.shape[0] * z, original.shape[1] * z, 3), dtype=np.float32)
# Iterate through the tile coordinates and pass them through the network.
for y, x in itertools.product(range(0, original.shape[0], s), range(0, original.shape[1], s)):
img = np.transpose(image[y:y+p*2+s,x:x+p*2+s,:] / 255.0 - 0.5, (2, 0, 1))[np.newaxis].astype(np.float32)
*_, repro = self.model.predict(img)
output[y*z:(y+s)*z,x*z:(x+s)*z,:] = np.transpose(repro[0] + 0.5, (1, 2, 0))[p*z:-p*z,p*z:-p*z,:]
#print('.', end='', flush=True)
output = output.clip(0.0, 1.0) * 255.0
# Match color histograms if the user specified this option.
if rendering_histogram:
for i in range(3):
output[:,:,i] = self.match_histograms(output[:,:,i], original[:,:,i])
return scipy.misc.toimage(output, cmin=0, cmax=255)
if __name__ == "__main__":
if train:
zoom = 2**(generator_upscale - generator_downscale)
enhancer = NeuralEnhancer(model, zoom, loader=True)
x2 = NeuralEnhancer('default', 2, loader=False)
x4 = NeuralEnhancer('default', 4, loader=False)
repair = NeuralEnhancer('repair', 1, loader=False)
deblur = NeuralEnhancer('deblur', 1, loader=False)
for filename in files:
print(filename, end=' ')
cv_img = cv2.imread(filename)
pil_image=cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
#img = scipy.ndimage.imread(filename, mode='RGB')
out = np.array(x4.process(pil_image))
#out = np.array(deblur.process(out))
out = np.array(repair.process(out))
out = np.array(x2.process(out))
opencv_image=cv2.cvtColor(out, cv2.COLOR_RGB2BGR)
cv2.destroyAllWindows()[0]+'_ne%ix.png' % zoom)
# Original work Copyright (c) 2018, dfaker.
# Modified work Copyright (c) 2018, Alexis_TheLarge.
# Subject to Mozilla Public License
# See:
import argparse
import cv2
import json
import numpy
from pathlib import Path
from tqdm import tqdm
from scipy import ndimage
from model import autoencoder_A
from model import autoencoder_B
from model import encoder, decoder_A, decoder_B
import enhance2
from enhance2 import NeuralEnhancer
encoder .load_weights( "models/encoder.h5" )
decoder_A.load_weights( "models/decoder_A.h5" )
decoder_B.load_weights( "models/decoder_B.h5" )
import time
imageSize = 256
croppedSize = 240
zmask = numpy.zeros((1,128, 128,1),float)
NEx2 = NeuralEnhancer('default', 2, loader=False)
NEx4 = NeuralEnhancer('default', 4, loader=False)
NE_deblur = NeuralEnhancer('deblur', 1, loader=False)
def image_stats(image):
(l, a, b) = cv2.split(image)
(lMean, lStd) = (l.mean(), l.std())
(aMean, aStd) = (a.mean(), a.std())
(bMean, bStd) = (b.mean(), b.std())
return (lMean, lStd, aMean, aStd, bMean, bStd)
def adjust_avg_color(img_old,img_new):
w,h,c = img_new.shape
for i in range(img_new.shape[-1]):
old_avg = img_old[:, :, i].mean()
new_avg = img_new[:, :, i].mean()
diff_int = (int)(old_avg - new_avg)
for m in range(img_new.shape[0]):
for n in range(img_new.shape[1]):
temp = (img_new[m,n,i] + diff_int)
if temp < 0:
img_new[m,n,i] = 0
elif temp > 255:
img_new[m,n,i] = 255
img_new[m,n,i] = temp
def transfer_avg_color(img_old,img_new):
source = cv2.cvtColor(img_old, cv2.COLOR_BGR2LAB).astype("float32")
target = cv2.cvtColor(img_new, cv2.COLOR_BGR2LAB).astype("float32")
(lMeanSrc, lStdSrc, aMeanSrc, aStdSrc, bMeanSrc, bStdSrc) = image_stats(source)
(lMeanTar, lStdTar, aMeanTar, aStdTar, bMeanTar, bStdTar) = image_stats(target)
(l, a, b) = cv2.split(target)
l -= lMeanTar
a -= aMeanTar
b -= bMeanTar
l = (lStdTar / lStdSrc) * l
a = (aStdTar / aStdSrc) * a
b = (bStdTar / bStdSrc) * b
l += lMeanSrc
a += aMeanSrc
b += bMeanSrc
l = numpy.clip(l, 0, 255)
a = numpy.clip(a, 0, 255)
b = numpy.clip(b, 0, 255)
transfer = cv2.merge([l, a, b])
transfer = cv2.cvtColor(transfer.astype("uint8"), cv2.COLOR_LAB2BGR)
return transfer
def convert_one_image( autoencoder,otherautoencoder, image, mat,facepoints,erosion_kernel,blur_size,seamlessClone,maskType,doublePass=False ):
global n
size = 64
image_size = image.shape[1], image.shape[0]
sourceMat = mat.copy()
sourceMat = sourceMat*(240+(16*2))
sourceMat[:,2] += 48
face = cv2.warpAffine( image, sourceMat, (240+(48+16)*2,240+(48+16)*2) )
sourceFace = face.copy()
sourceFace = cv2.resize(sourceFace,(128,128),cv2.INTER_CUBIC)
face = cv2.resize(face,(64,64),cv2.INTER_AREA)
face = numpy.expand_dims( face, 0 )
new_face_rgb,new_face_m = autoencoder.predict( [face / 255.0,zmask] )
if doublePass:
#feed the original prediction back into the network for a second round.
new_face_rgb = new_face_rgb.reshape((128, 128, 3))
new_face_rgb = cv2.resize( new_face_rgb , (64,64))
new_face_rgb = numpy.expand_dims( new_face_rgb, 0 )
new_face_rgb,_ = autoencoder.predict( [new_face_rgb,zmask] )
_,other_face_m = otherautoencoder.predict( [face / 255.0,zmask] )
new_face_m = numpy.maximum(new_face_m, other_face_m )
new_face_rgb = numpy.clip( new_face_rgb[0] * 255, 0, 255 ).astype( image.dtype )
new_face_m = numpy.clip( new_face_m[0] , 0, 1 ).astype( float ) * numpy.ones((new_face_m.shape[0],new_face_m.shape[1],3))
base_image = numpy.copy( image )
new_image = numpy.copy( image )
transmat = mat * (64-16) *16
transmat[::,2] += 8*16
new_face_rgb = numpy.array(NEx4.process(new_face_rgb))
new_face_rgb = numpy.array(NE_deblur.process(new_face_rgb))
new_face_rgb = cv2.GaussianBlur(new_face_rgb,(11,11),0)
new_face_rgb = numpy.array(NEx2.process(new_face_rgb))
new_face_m = cv2.resize(new_face_m, (1024,1024)) # scale mask to same
cv2.warpAffine( new_face_rgb, transmat, image_size, new_image, cv2.WARP_INVERSE_MAP | cv2.INTER_LANCZOS4, cv2.BORDER_TRANSPARENT )
image_mask = numpy.zeros_like(new_image, dtype=float)
cv2.warpAffine( new_face_m, transmat, image_size, image_mask, cv2.WARP_INVERSE_MAP | cv2.INTER_CUBIC, cv2.BORDER_TRANSPARENT )
if erosion_kernel is not None:
image_mask = cv2.erode(image_mask, erosion_kernel, iterations = 1)
#slightly enlarge the mask area
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3))
image_mask = cv2.dilate(image_mask,kernel,iterations = 1)
if seamlessClone:
unitMask = numpy.clip( image_mask * 365, 0, 255 ).astype(numpy.uint8)
maxregion = numpy.argwhere(unitMask==255)
if maxregion.size > 0:
miny,minx = maxregion.min(axis=0)[:2]
maxy,maxx = maxregion.max(axis=0)[:2]
lenx = maxx - minx;
leny = maxy - miny;
masky = int(minx+(lenx//2))
maskx = int(miny+(leny//2))
new_image = cv2.seamlessClone(new_image.astype(numpy.uint8),base_image.astype(numpy.uint8),unitMask,(masky,maskx) , cv2.NORMAL_CLONE )
#image_mask = cv2.GaussianBlur(image_mask,(11,11),0)
if blur_size!=0:
image_mask = cv2.GaussianBlur(image_mask,(blur_size,blur_size),0)
foreground = cv2.multiply(image_mask, new_image.astype(float))
background = cv2.multiply(1.0 - image_mask, base_image.astype(float))
output = numpy.add(background,foreground)
cv2.imshow("output", output.astype(numpy.uint8) )
if cv2.waitKey(1)==ord('q'):
return output
def main( args ):
input_dir = Path( args.input_dir )
assert input_dir.is_dir()
alignments = input_dir / args.alignments
with as f:
alignments = json.load(f)
output_dir = input_dir / args.output_dir
output_dir.mkdir( parents=True, exist_ok=True )
args.direction = 'AtoB'
if args.direction == 'AtoB': autoencoder,otherautoencoder = autoencoder_B,autoencoder_A
if args.direction == 'BtoA': autoencoder,otherautoencoder = autoencoder_A,autoencoder_B
if args.blurSize % 2 == 0:
if args.erosionKernelSize>0:
erosion_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(args.erosionKernelSize,args.erosionKernelSize))
erosion_kernel = None
for e in alignments:
if len(e)<4:
raise LookupError('This script expects new format json files with face points included.')
for image_file, face_file, mat,facepoints in tqdm( alignments[args.startframe::args.frameSkip] ):
image = cv2.imread( str( input_dir / image_file ) )
face = cv2.imread( str( input_dir / face_file ) )
mat = numpy.array(mat).reshape(2,3)
if image is None: continue
if face is None: continue
new_image = convert_one_image( autoencoder, otherautoencoder, image, mat, facepoints, erosion_kernel, args.blurSize, args.seamlessClone, args.maskType, args.doublePass)
output_file = output_dir / Path(image_file).name
cv2.imwrite( str(output_file), new_image )
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
raise argparse.ArgumentTypeError('Boolean value expected.')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument( "input_dir", type=str, nargs='?' )
parser.add_argument( "alignments", type=str, nargs='?', default='alignments.json' )
parser.add_argument( "output_dir", type=str, nargs='?', default='merged' )
parser.add_argument("--seamlessClone", type=str2bool, nargs='?', const=False, default='False', help="Attempt to use opencv seamlessClone.")
parser.add_argument("--doublePass", type=str2bool, nargs='?', const=False, default='False', help="Pass the original prediction output back through for a second pass.")
parser.add_argument('--maskType', type=str, default='FaceHullAndRect' ,choices=['FaceHullAndRect','FaceHull','Rect'], help="The type of masking to use around the face.")
parser.add_argument( "--startframe", type=int, default='0' )
parser.add_argument( "--frameSkip", type=int, default='1' )
parser.add_argument( "--blurSize", type=int, default='4' )
parser.add_argument( "--erosionKernelSize", type=int, default='2' )
parser.add_argument( "--direction", type=str, default="AtoB", choices=["AtoB", "BtoA"])
main( parser.parse_args() )
# Original work Copyright (c) 2018, dfaker.
# Modified work Copyright (c) 2018, Alexis_TheLarge.
# Subject to Mozilla Public License
# See:
from keras.models import Model
from keras.layers import Input, Dense, Flatten, Reshape, Dropout, Add,Concatenate, Lambda
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import Conv2D
from keras.initializers import RandomNormal
from keras.optimizers import Adam
from pixel_shuffler import PixelShuffler
import tensorflow as tf
from keras_contrib.losses import DSSIMObjective
from keras import losses
import time
from keras.utils import multi_gpu_model
class penalized_loss(object):
def __init__(self,mask,lossFunc,maskProp= 1.0):
self.mask = mask
self.maskProp = maskProp
self.maskaskinvProp = 1-maskProp
def __call__(self,y_true, y_pred):
tro, tgo, tbo = tf.split(y_true,3, 3 )
pro, pgo, pbo = tf.split(y_pred,3, 3 )
tr = tro
tg = tgo
tb = tbo
pr = pro
pg = pgo
pb = pbo
m = self.mask
m = m*self.maskProp
m += self.maskaskinvProp
tr *= m
tg *= m
tb *= m
pr *= m
pg *= m
pb *= m
y = tf.concat([tr, tg, tb],3)
p = tf.concat([pr, pg, pb],3)
#yo = tf.stack([tro,tgo,tbo],3)
#po = tf.stack([pro,pgo,pbo],3)
return self.lossFunc(y,p)
optimizer = Adam( lr=5e-5, beta_1=0.5, beta_2=0.999 )
IMAGE_SHAPE = (64,64,3)
conv_init = RandomNormal(0, 0.02)
gamma_init = RandomNormal(1., 0.02)
def __conv_init(a):
print("conv_init", a)
k = RandomNormal(0, 0.02)(a) # for convolution kernel
k.conv_weight = True
return k
def upscale_ps(filters, use_norm=True):
def block(x):
x = Conv2D(filters*4, kernel_size=3, use_bias=False, kernel_initializer=RandomNormal(0, 0.02), padding='same' )(x)
x = LeakyReLU(0.1)(x)
x = PixelShuffler()(x)
return x
return block
def res_block(input_tensor, f):
x = input_tensor
x = Conv2D(f, kernel_size=3, kernel_initializer=conv_init, use_bias=False, padding="same")(x)
x = LeakyReLU(alpha=0.2)(x)
x = Conv2D(f, kernel_size=3, kernel_initializer=conv_init, use_bias=False, padding="same")(x)
x = Add()([x, input_tensor])
x = LeakyReLU(alpha=0.2)(x)
return x
def conv( filters ):
def block(x):
x = Conv2D( filters, kernel_size=5, strides=2, padding='same' )(x)
x = LeakyReLU(0.1)(x)
return x
return block
def upscale( filters ):
def block(x):
x = Conv2D( filters*4, kernel_size=3, padding='same' )(x)
x = LeakyReLU(0.1)(x)
x = PixelShuffler()(x)
return x
return block
def Encoder():
input_ = Input( shape=IMAGE_SHAPE )
x = conv( 128)(input_)
x = conv( 256)(x)
x = conv( 512)(x)
x = conv(1024)(x)
x = Dense( ENCODER_DIM )( Flatten()(x) )
x = Dense(4*4*1024)(x)
x = Reshape((4,4,1024))(x)
x = upscale(512)(x)
return Model( input_, [x] )
def Decoder(name):
input_ = Input( shape=(8,8,512) )
skip_in = Input( shape=(8,8,512) )
x = input_
x = upscale(512)(x)
x = res_block(x, 512)
x = upscale(256)(x)
x = res_block(x, 256)
x = upscale(128)(x)
x = res_block(x, 128)
x = upscale(64)(x)
x = Conv2D( 3, kernel_size=5, padding='same', activation='sigmoid' )(x)
y = input_
y = upscale(512)(y)
y = upscale(256)(y)
y = upscale(128)(y)
y = upscale(64)(y)
y = Conv2D( 1, kernel_size=5, padding='same', activation='sigmoid' )(y)
return Model( [input_], outputs=[x,y] )
### ensure sure we have enough vram left to run NE model
import os
import keras.backend.tensorflow_backend as KTF
def get_session(gpu_fraction=0.8):
num_threads = os.environ.get('OMP_NUM_THREADS')
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=gpu_fraction)
if num_threads:
return tf.Session(config=tf.ConfigProto(
gpu_options=gpu_options, intra_op_parallelism_threads=num_threads))
return tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
encoder = Encoder()
decoder_A = Decoder('MA')
decoder_B = Decoder('MB')
x1 = Input( shape=IMAGE_SHAPE )
x2 = Input( shape=IMAGE_SHAPE )
m1 = Input( shape=(64*2,64*2,1) )
m2 = Input( shape=(64*2,64*2,1) )
autoencoder_A = Model( [x1,m1], decoder_A( encoder(x1) ) )
#autoencoder_A = multi_gpu_model( autoencoder_A ,2)
autoencoder_B = Model( [x2,m2], decoder_B( encoder(x2) ) )
#autoencoder_B = multi_gpu_model( autoencoder_B ,2)
o1,om1 = decoder_A( encoder(x1))
o2,om2 = decoder_B( encoder(x2))
DSSIM = DSSIMObjective()
autoencoder_A.compile( optimizer=optimizer, loss=[ penalized_loss(m1, DSSIM),'mse'] )
autoencoder_B.compile( optimizer=optimizer, loss=[ penalized_loss(m2, DSSIM),'mse'] )
